Skip to content

Commit 10fce4d

Browse files
authored
Explicit closing of prepared statement portals in transactions to release table locks. (#393)
1 parent 96588bd commit 10fce4d

File tree

6 files changed

+109
-19
lines changed

6 files changed

+109
-19
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 3.4.4
4+
5+
- Fix: explicit closing of prepared statement portals in transactions to release table locks. [#393](https://github.com/isoos/postgresql-dart/pull/393)
6+
37
## 3.4.3
48

59
- Fix: prevent hanging state by forwarding protocol-level parsing errors into the message stream.

lib/src/messages/client_messages.dart

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ class DescribeMessage extends ClientMessage {
172172
buffer.writeUint8(_isPortal ? $P : $S);
173173
buffer.writeEncodedString(name); // Name of prepared statement
174174
}
175+
176+
@override
177+
String toString() => 'DescribeMessage(name=$_name, portal=$_isPortal)';
175178
}
176179

177180
class BindMessage extends ClientMessage {
@@ -248,6 +251,12 @@ class BindMessage extends ClientMessage {
248251
buffer.writeUint16(1);
249252
buffer.writeUint16(1);
250253
}
254+
255+
@override
256+
String toString() => 'BindMessage(${[
257+
if (_portalName.isNotEmpty) 'portal=$_portalName',
258+
if (_statementName.isNotEmpty) 'stmt=$_statementName',
259+
].join(',')})';
251260
}
252261

253262
class ExecuteMessage extends ClientMessage {
@@ -263,6 +272,9 @@ class ExecuteMessage extends ClientMessage {
263272
buffer.writeEncodedString(portalName);
264273
buffer.writeUint32(0);
265274
}
275+
276+
@override
277+
String toString() => 'ExecuteMessage(portal=$_portalName)';
266278
}
267279

268280
class CloseMessage extends ClientMessage {
@@ -288,6 +300,9 @@ class CloseMessage extends ClientMessage {
288300
..writeUint8(_isForPortal ? $P : $S);
289301
buffer.writeEncodedString(name);
290302
}
303+
304+
@override
305+
String toString() => 'CloseMessage(name=$_name, portal=$_isForPortal)';
291306
}
292307

293308
class SyncMessage extends ClientMessage {
@@ -298,6 +313,9 @@ class SyncMessage extends ClientMessage {
298313
buffer.writeUint8(ClientMessageId.sync);
299314
buffer.writeUint32(4);
300315
}
316+
317+
@override
318+
String toString() => 'SyncMessage';
301319
}
302320

303321
class TerminateMessage extends ClientMessage {

lib/src/messages/server_messages.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class CloseCompleteMessage extends ServerMessage {
270270
CloseCompleteMessage();
271271

272272
@override
273-
String toString() => 'Bind Complete Message';
273+
String toString() => 'Close Complete Message';
274274
}
275275

276276
class ParameterDescriptionMessage extends ServerMessage {

lib/src/v3/connection.dart

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:collection';
23
import 'dart:convert';
34
import 'dart:io';
45
import 'dart:typed_data';
@@ -258,7 +259,6 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
258259
),
259260
async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) {
260261
print('[$hash][out] $msg');
261-
print('[out] $msg');
262262
sink.add(msg);
263263
}),
264264
));
@@ -646,6 +646,14 @@ class _PreparedStatement extends Statement {
646646
final String _name;
647647
final _PgSessionBase _session;
648648

649+
/// Apparently, when we are in a transaction and using extended query mode,
650+
/// one needs to close the portals to release the locks on tables.
651+
/// This queue will collect the portal names to close and they will be closed
652+
/// when the prepared statement is disposed or a run call completes.
653+
///
654+
/// See more in https://github.com/isoos/postgresql-dart/issues/390
655+
Queue<String>? _portalsToClose;
656+
649657
_PreparedStatement(this._description, this._name, this._session);
650658

651659
@override
@@ -674,17 +682,33 @@ class _PreparedStatement extends Statement {
674682
);
675683
} finally {
676684
await subscription.cancel();
685+
await _closePendingPortals();
677686
}
678687
}
679688

680689
@override
681690
Future<void> dispose() async {
682691
// Don't send a dispose message if the connection is already closed.
683692
if (!_session._connection._isClosing) {
693+
await _closePendingPortals();
684694
await _session._sendAndWaitForQuery<CloseCompleteMessage>(
685695
CloseMessage.statement(_name));
686696
}
687697
}
698+
699+
void _addPortalToClose(String portalName) {
700+
_portalsToClose ??= Queue();
701+
_portalsToClose!.add(portalName);
702+
}
703+
704+
Future<void> _closePendingPortals() async {
705+
final list = _portalsToClose;
706+
while (list != null && list.isNotEmpty) {
707+
final portalName = list.removeFirst();
708+
await _session._sendAndWaitForQuery<CloseCompleteMessage>(
709+
CloseMessage.portal(portalName));
710+
}
711+
}
688712
}
689713

690714
class _BoundStatement extends Stream<ResultRow> implements ResultStream {
@@ -718,6 +742,7 @@ class _PgResultStreamSubscription
718742
final _schema = Completer<ResultSchema>();
719743
final _done = Completer<void>();
720744
ResultSchema? _resultSchema;
745+
_BoundStatement? _boundStatement;
721746

722747
@override
723748
PgConnectionImplementation get connection => session._connection;
@@ -729,6 +754,7 @@ class _PgResultStreamSubscription
729754
_BoundStatement statement, this._controller, this._source)
730755
: session = statement.statement._session,
731756
ignoreRows = false,
757+
_boundStatement = statement,
732758
_trace = StackTrace.current {
733759
_scheduleStatement(() async {
734760
connection._pending = this;
@@ -885,6 +911,11 @@ class _PgResultStreamSubscription
885911
// we'll get this more than once.
886912
_affectedRowsSoFar += message.rowsAffected;
887913
case ReadyForQueryMessage():
914+
// It looks like simple query protocol statements, or statements outside of a transaction
915+
// do not need the portal to be closed explicitly.
916+
if (message.state == ReadyForQueryMessageState.transaction) {
917+
_boundStatement?.statement._addPortalToClose(_portalName);
918+
}
888919
await _completeQuery();
889920
case CopyBothResponseMessage():
890921
// This message indicates a successful start for Streaming Replication.

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: postgres
22
description: PostgreSQL database driver. Supports statement reuse and binary protocol and connection pooling.
3-
version: 3.4.3
3+
version: 3.4.4
44
homepage: https://github.com/isoos/postgresql-dart
55
topics:
66
- sql

test/v3_test.dart

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -471,37 +471,74 @@ void main() {
471471
});
472472
});
473473

474-
withPostgresServer('issue 390', (server) {
475-
test(
476-
'issue 390 - Cannot ALTER TABLE because it is being used by active queries in this session',
477-
() async {
478-
final conn = await server.newConnection();
479-
const tableToAlter = 'table_to_alter';
480-
const otherTable = 'other_table';
474+
withPostgresServer('issue 390 - closing the portal of prepared statements',
475+
(server) {
476+
const tableToAlter = 'table_to_alter';
477+
const otherTable = 'other_table';
481478

479+
setUp(() async {
480+
final conn = await server.newConnection();
482481
await conn.execute('''
483482
CREATE TABLE $tableToAlter (
484483
a_id INTEGER PRIMARY KEY NOT NULL,
485484
a_other_id INTEGER NOT NULL
486485
);''');
487-
488486
await conn.execute(
489487
'CREATE TABLE $otherTable (other_id INTEGER PRIMARY KEY NOT NULL);');
488+
await conn.close();
489+
});
490+
491+
tearDown(() async {
492+
final conn = await server.newConnection();
493+
await conn.execute('DROP TABLE $tableToAlter;');
494+
await conn.execute('DROP TABLE $otherTable;');
495+
});
496+
497+
test('non-transaction update', () async {
498+
final conn = await server.newConnection();
499+
await conn.execute('SELECT * FROM $tableToAlter;');
500+
await conn.execute('''
501+
ALTER TABLE $tableToAlter
502+
ADD CONSTRAINT fk_other
503+
FOREIGN KEY (a_other_id)
504+
REFERENCES $otherTable(other_id);''');
505+
});
490506

507+
test('transaction via statement', () async {
508+
final conn = await server.newConnection();
509+
await conn.execute('BEGIN');
510+
await conn.execute('SELECT * FROM $tableToAlter;');
511+
await conn.execute('''
512+
ALTER TABLE $tableToAlter
513+
ADD CONSTRAINT fk_other
514+
FOREIGN KEY (a_other_id)
515+
REFERENCES $otherTable(other_id);''');
516+
await conn.execute('COMMIT');
517+
});
518+
519+
test('transaction via statement - simple query protocol', () async {
520+
final conn = await server.newConnection(queryMode: QueryMode.simple);
521+
await conn.execute('BEGIN');
522+
await conn.execute('SELECT * FROM $tableToAlter;');
523+
await conn.execute('''
524+
ALTER TABLE $tableToAlter
525+
ADD CONSTRAINT fk_other
526+
FOREIGN KEY (a_other_id)
527+
REFERENCES $otherTable(other_id);''');
528+
await conn.execute('COMMIT');
529+
});
530+
531+
test('transaction via runTx', () async {
532+
final conn = await server.newConnection();
491533
await conn.runTx((tx) async {
492-
// Select from the table that will be altered
493534
await tx.execute('SELECT * FROM $tableToAlter;');
494-
495-
// Add a foreign key constraint
496535
await tx.execute('''
497536
ALTER TABLE $tableToAlter
498-
ADD CONSTRAINT fk_other
499-
FOREIGN KEY (a_other_id)
537+
ADD CONSTRAINT fk_other
538+
FOREIGN KEY (a_other_id)
500539
REFERENCES $otherTable(other_id);''');
501540
});
502-
503-
// Should not throw
504-
}, skip: 'investigation needed');
541+
});
505542
});
506543
}
507544

0 commit comments

Comments
 (0)