1010//
1111//===----------------------------------------------------------------------===//
1212
13- public import Dispatch
1413public import Foundation
1514public import LanguageServerProtocol
1615@_spi ( SourceKitLSP) import SKLogging
@@ -49,15 +48,15 @@ public final class JSONRPCConnection: Connection {
4948 nonisolated ( unsafe) private var receiveHandler : MessageHandler ? = nil
5049
5150 /// Queue for synchronizing all messages to ensure they remain in order
52- private let queue : DispatchQueue = DispatchQueue ( label : " jsonrpc-queue " , qos : . userInitiated )
51+ private let queue : DispatchQueue
5352
54- /// Queue for reading off of `receiveFD`
55- private let readQueue : DispatchQueue = DispatchQueue ( label : " jsonrpc-read-queue " , qos : . userInitiated )
53+ /// Queue for the read loop (effectively just a separate thread - we never yield from the initial task)
54+ private let receiveQueue : DispatchQueue
5655
5756 /// Queue for sending any data through `sendFD`. This is currently needed as the read loop is blocked on messages
5857 /// being parsed on `queue` (in order to not add an extra copy), so we must perform any corresponding sends off of
5958 /// `queue`. If we ever change that, we can likely remove this queue.
60- private let sendQueue : DispatchQueue = DispatchQueue ( label : " jsonrpc-send-queue " , qos : . userInitiated )
59+ private let sendQueue : DispatchQueue
6160
6261 /// File descriptor for reading input (eg. stdin for an LSP server)
6362 private let receiveFD : FileHandle
@@ -137,6 +136,9 @@ public final class JSONRPCConnection: Connection {
137136 sendMirrorFile: FileHandle ? = nil
138137 ) {
139138 self . name = name
139+ self . queue = DispatchQueue ( label: " \( name) -jsonrpc-queue " , qos: . userInitiated)
140+ self . receiveQueue = DispatchQueue ( label: " \( name) -jsonrpc-read-queue " , qos: . userInitiated)
141+ self . sendQueue = DispatchQueue ( label: " \( name) -jsonrpc-send-queue " , qos: . userInitiated)
140142 self . receiveFD = receiveFD
141143 self . receiveMirrorFile = receiveMirrorFile
142144 self . sendFD = sendFD
@@ -248,7 +250,7 @@ public final class JSONRPCConnection: Connection {
248250 self . closeHandler = closeHandler
249251 }
250252
251- self . readQueue . async {
253+ self . receiveQueue . async {
252254 let parser = JSONMessageParser ( decoder: self . decodeJSONRPCMessage)
253255 while true {
254256 let data = orLog ( " Reading from \( self . name) " ) { try self . receiveFD. read ( upToCount: parser. nextReadLength) }
@@ -286,7 +288,7 @@ public final class JSONRPCConnection: Connection {
286288 \( message) . Please run 'sourcekit-lsp diagnose' to file an issue.
287289 """
288290 )
289- self . send ( . notification( showMessage) )
291+ self . sendAssumingOnQueue ( . notification( showMessage) )
290292 }
291293
292294 /// Decode a single JSONRPC message from the given `messageBytes`.
@@ -339,7 +341,7 @@ public final class JSONRPCConnection: Connection {
339341 logger. fault (
340342 " Replying to request \( id, privacy: . public) with error response because we failed to decode the request "
341343 )
342- self . send ( . errorResponse( ResponseError ( error) , id: id) )
344+ self . sendAssumingOnQueue ( . errorResponse( ResponseError ( error) , id: id) )
343345 return nil
344346 }
345347 // If we don't know the ID of the request, ignore it and show a notification to the user.
@@ -458,36 +460,33 @@ public final class JSONRPCConnection: Connection {
458460 /// If an unrecoverable error occurred on the channel's file descriptor, the connection gets closed.
459461 ///
460462 /// - Important: Must be called on `queue`
461- private func send ( data dispatchData : DispatchData ) {
463+ private func sendAssumingOnQueue ( data: Data ) {
462464 dispatchPrecondition ( condition: . onQueue( queue) )
463465
464466 guard readyToSend ( ) else { return }
465467
466- #if !os(macOS)
467- nonisolated ( unsafe) let dispatchData = dispatchData
468- #endif
469468 sendQueue. async {
470469 orLog ( " Writing send mirror file " ) {
471- try self . sendMirrorFile? . write ( contentsOf: dispatchData )
470+ try self . sendMirrorFile? . write ( contentsOf: data )
472471 }
473472
474473 do {
475- try self . sendFD. write ( contentsOf: dispatchData )
474+ try self . sendFD. write ( contentsOf: data )
476475 } catch {
477476 logger. fault ( " IO error sending message to \( self . name) : \( error. forLogging) " )
478477 self . close ( )
479478 }
480479 }
481480 }
482481
483- /// Wrapper of `send (data:)` that automatically switches to `queue`.
482+ /// Wrapper of `sendAssumingOnQueue (data:)` that automatically switches to `queue`.
484483 ///
485484 /// This should only be used to test that the client decodes messages correctly if data is delivered to it
486485 /// byte-by-byte instead of in larger chunks that contain entire messages.
487486 @_spi ( Testing)
488- public func send( _rawData dispatchData : DispatchData ) {
487+ public func send( data : Data ) {
489488 queue. sync {
490- self . send ( data: dispatchData )
489+ self . sendAssumingOnQueue ( data: data )
491490 }
492491 }
493492
@@ -496,14 +495,12 @@ public final class JSONRPCConnection: Connection {
496495 /// If an unrecoverable error occurred on the channel's file descriptor, the connection gets closed.
497496 ///
498497 /// - Important: Must be called on `queue`
499- private func send ( _ message: JSONRPCMessage ) {
498+ private func sendAssumingOnQueue ( _ message: JSONRPCMessage ) {
500499 dispatchPrecondition ( condition: . onQueue( queue) )
501500
502- let encoder = JSONEncoder ( )
503-
504- let data : Data
501+ let content : Data
505502 do {
506- data = try encoder . encode ( message)
503+ content = try JSONEncoder ( ) . encode ( message)
507504 } catch {
508505 logger. fault ( " Failed to encode message: \( error. forLogging) " )
509506 logger. fault ( " Malformed message: \( String ( describing: message) ) " )
@@ -541,16 +538,9 @@ public final class JSONRPCConnection: Connection {
541538 }
542539 }
543540
544- var dispatchData = DispatchData . empty
545- let header = " Content-Length: \( data. count) \r \n \r \n "
546- header. utf8. map { $0 } . withUnsafeBytes { buffer in
547- dispatchData. append ( buffer)
548- }
549- data. withUnsafeBytes { rawBufferPointer in
550- dispatchData. append ( rawBufferPointer)
551- }
552-
553- send ( data: dispatchData)
541+ let header = " Content-Length: \( content. count) \r \n \r \n "
542+ sendAssumingOnQueue ( data: Data ( header. utf8) )
543+ sendAssumingOnQueue ( data: content)
554544 }
555545
556546 /// Close the connection.
@@ -585,6 +575,12 @@ public final class JSONRPCConnection: Connection {
585575 orLog ( " Closing receiveFD to \( name) " ) {
586576 try receiveFD. close ( )
587577 }
578+ orLog ( " Closing sendMirrorFile to \( name) " ) {
579+ try sendMirrorFile? . close ( )
580+ }
581+ orLog ( " Closing receiveMirrorFile to \( name) " ) {
582+ try receiveMirrorFile? . close ( )
583+ }
588584 }
589585
590586 self . receiveHandler = nil
@@ -614,7 +610,7 @@ public final class JSONRPCConnection: Connection {
614610 \( notification. forLogging)
615611 """
616612 )
617- self . send ( . notification( notification) )
613+ self . sendAssumingOnQueue ( . notification( notification) )
618614 }
619615 }
620616
@@ -664,8 +660,7 @@ public final class JSONRPCConnection: Connection {
664660 """
665661 )
666662
667- self . send ( . request( request, id: id) )
668- return
663+ self . sendAssumingOnQueue ( . request( request, id: id) )
669664 }
670665 }
671666
@@ -674,9 +669,9 @@ public final class JSONRPCConnection: Connection {
674669 queue. async {
675670 switch response {
676671 case . success( let result) :
677- self . send ( . response( result, id: id) )
672+ self . sendAssumingOnQueue ( . response( result, id: id) )
678673 case . failure( let error) :
679- self . send ( . errorResponse( error, id: id) )
674+ self . sendAssumingOnQueue ( . errorResponse( error, id: id) )
680675 }
681676 }
682677 }
0 commit comments