From 0661eaed55545da9bbe983b492b0eeb57645ebf8 Mon Sep 17 00:00:00 2001 From: Tim McKenzie Date: Wed, 15 Jul 2015 16:41:57 -0700 Subject: [PATCH] Emit sentFrame and receivedFrame events from endpoint. Want to be able to inspect frames as the come and go. Added an onEndpoint parameter to Agent.request() and EndPoint(). Needed onEndpoint because after constructor for EndPoint has returned some frames have already been sent, and would be missed by any event handler. Also added onSentFrame and onReceivedFrame parameters to Connection() constructor for same reason. Refactored the constructor for EndPoint and Connection to take a config object instead of a list of parameters. The parameter list on the constructor was getting pretty long. Added test for sentFrame, and receiveFrame. Updated existing tests to handle new constructor for EndPoint and Connection. Conflicts: lib/http.js --- lib/http.js | 42 ++++++++++++++++++++++++++++--- lib/protocol/connection.js | 46 +++++++++++++++++++++++++++------- lib/protocol/endpoint.js | 48 +++++++++++++++++++++++++++-------- test/connection.js | 6 ++--- test/endpoint.js | 24 +++++++++++++++--- test/http.js | 51 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 186 insertions(+), 31 deletions(-) diff --git a/lib/http.js b/lib/http.js index 46cc92ff..9f2e5780 100644 --- a/lib/http.js +++ b/lib/http.js @@ -17,6 +17,8 @@ // - **Event: 'connection' (socket, [endpoint])**: there's a second argument if the negotiation of // HTTP/2 was successful: the reference to the [Endpoint](endpoint.html) object tied to the // socket. +// - **Event: 'endpoint' (endpoint)**: the endpoint created by the new connection. Emitted +// before any data has been transmitted. // // - **http2.createServer(options, [requestListener])**: additional option: // - **log**: an optional [bunyan](https://github.com/trentm/node-bunyan) logger object @@ -38,6 +40,7 @@ // - **http2.request(options, [callback])**: additional option: // - **plain**: if `true`, the client will not try to build a TLS tunnel, instead it will use // the raw TCP stream for HTTP/2 +// - **onEndpoint**: A event handler for the endpoint event. // // - **Class: http2.ClientRequest** // - **Event: 'socket' (socket)**: in case of an HTTP/2 incoming message, `socket` is a reference @@ -45,6 +48,8 @@ // - **Event: 'push' (promise)**: signals the intention of a server push associated to this // request. `promise` is an IncomingPromise. If there's no listener for this event, the server // push is cancelled. +// - **Event: 'endpoint'**: Emited when the endpoint for the request has been created. If the +// request is reusing a connection, the existing Endpoint is emitted. // - **request.setPriority(priority)**: assign a priority to this request. `priority` is a number // between 0 (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. // @@ -446,7 +451,15 @@ Server.prototype = Object.create(EventEmitter.prototype, { constructor: { value: // Starting HTTP/2 Server.prototype._start = function _start(socket) { - var endpoint = new Endpoint(this._log, 'SERVER', this._settings); + var self = this; + var endpoint = new Endpoint({ + log: this._log, + role: 'SERVER', + settings: this._settings, + onEndpoint: function (endpoint) { + self.emit('endpoint', endpoint); + } + }); this._log.info({ e: endpoint, client: socket.remoteAddress + ':' + socket.remotePort, @@ -841,6 +854,11 @@ Agent.prototype.request = function request(options, callback) { var request = new OutgoingRequest(this._log); + // * dealing with endpoint event handler + if (typeof options.onEndpoint === 'function') { + request.addListener('endpoint', options.onEndpoint); + } + if (callback) { request.on('response', callback); } @@ -854,17 +872,27 @@ Agent.prototype.request = function request(options, callback) { // * There's an existing HTTP/2 connection to this host if (key in this.endpoints) { var endpoint = this.endpoints[key]; + this.emit('endpoint', endpoint); + request._start(endpoint.createStream(), options); } // * HTTP/2 over plain TCP else if (options.plain) { - endpoint = new Endpoint(this._log, 'CLIENT', this._settings); + endpoint = new Endpoint({ + log: this._log, + role: 'CLIENT', + settings: this._settings, + onEndpoint: options.onEndpoint + }); + endpoint.socket = net.connect({ host: options.host, port: options.port, localAddress: options.localAddress }); + this.endpoints[key] = endpoint; + endpoint.pipe(endpoint.socket).pipe(endpoint); request._start(endpoint.createStream(), options); } @@ -879,7 +907,7 @@ Agent.prototype.request = function request(options, callback) { options.ciphers = options.ciphers || cipherSuites; var httpsRequest = https.request(options); - httpsRequest.on('socket', function(socket) { + httpsRequest.on('socket', function (socket) { var negotiatedProtocol = socket.alpnProtocol || socket.npnProtocol; if (negotiatedProtocol != null) { // null in >=0.11.0, undefined in <0.11.0 negotiated(); @@ -895,7 +923,13 @@ Agent.prototype.request = function request(options, callback) { if (negotiatedProtocol === protocol.VERSION) { httpsRequest.socket.emit('agentRemove'); unbundleSocket(httpsRequest.socket); - endpoint = new Endpoint(self._log, 'CLIENT', self._settings); + endpoint = new Endpoint({ + log: self._log, + role: 'CLIENT', + settings: self._settings, + onEndpoint: options.onEndpoint + }); + endpoint.socket = httpsRequest.socket; endpoint.pipe(endpoint.socket).pipe(endpoint); } diff --git a/lib/protocol/connection.js b/lib/protocol/connection.js index 18f26ae7..e28663e8 100644 --- a/lib/protocol/connection.js +++ b/lib/protocol/connection.js @@ -14,7 +14,12 @@ exports.Connection = Connection; // Public API // ---------- -// * **new Connection(log, firstStreamId, settings)**: create a new Connection +// * **new Connection(config)**: create a new Connection +// - `config.log`: bunyan logger of the parent +// - `config.firstStreamId`: the ID of the first outbound stream +// - `config.settings`: initial HTTP/2 settings +// - `config.onSentFrame`: Event handler for sentFrame event +// - `config.onReceivedFrame`: Event handler for receivedFrame event // // * **Event: 'error' (type)**: signals a connection level error made by the other end // @@ -23,6 +28,10 @@ exports.Connection = Connection; // // * **Event: 'stream' (stream)**: signals that there's an incoming stream // +// * **Event: 'sentFrame' (frame)**: signals a frame has been sent to the remote. +// +// * **Event: 'receivedFrame' (frame)**: signals a frame has been received from the remote. +// // * **createStream(): stream**: initiate a new stream // // * **set(settings, callback)**: change the value of one or more settings according to the @@ -36,15 +45,27 @@ exports.Connection = Connection; // ----------- // The main aspects of managing the connection are: -function Connection(log, firstStreamId, settings) { +function Connection(config) { // * initializing the base class Flow.call(this, 0); + // * save settings object + this._config = config || {}; + + // * attach frame events + if (typeof this._config.onSentFrame === 'function') { + this.addListener('sentFrame', this._config.onSentFrame); + } + + if (typeof this._config.onReceivedFrame === 'function') { + this.addListener('receivedFrame', this._config.onReceivedFrame); + } + // * logging: every method uses the common logger object - this._log = log.child({ component: 'connection' }); + this._log = this._config.log.child({component: 'connection'}); // * stream management - this._initializeStreamManagement(firstStreamId); + this._initializeStreamManagement(this._config.firstStreamId); // * lifecycle management this._initializeLifecycleManagement(); @@ -53,7 +74,7 @@ function Connection(log, firstStreamId, settings) { this._initializeFlowControl(); // * settings management - this._initializeSettingsManagement(settings); + this._initializeSettingsManagement(this._config.connectionSettings); // * multiplexing this._initializeMultiplexing(); @@ -308,6 +329,8 @@ priority_loop: continue; } + this.emit('sentFrame', frame); + nextBucket.push(stream); if (frame.stream === undefined) { @@ -371,6 +394,7 @@ Connection.prototype._receive = function _receive(frame, done) { // * and writes it to the `stream`'s `upstream` stream.upstream.write(frame); + this.emit('receivedFrame', frame); done(); }; @@ -451,13 +475,17 @@ Connection.prototype.set = function set(settings, callback) { } }); - // * Sending out the SETTINGS frame - this.push({ + var settingsFrame = { type: 'SETTINGS', - flags: { ACK: false }, + flags: {ACK: false}, stream: 0, settings: settings - }); + }; + + this.emit('sentFrame', settingsFrame); + + // * Sending out the SETTINGS frame + this.push(settingsFrame); for (var name in settings) { this.emit('SENDING_' + name, settings[name]); } diff --git a/lib/protocol/endpoint.js b/lib/protocol/endpoint.js index a218db04..2cb84e91 100644 --- a/lib/protocol/endpoint.js +++ b/lib/protocol/endpoint.js @@ -16,12 +16,12 @@ exports.Endpoint = Endpoint; // Public API // ---------- -// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint. +// - **new Endpoint(config)**: create a new Endpoint. // -// - `log`: bunyan logger of the parent -// - `role`: 'CLIENT' or 'SERVER' -// - `settings`: initial HTTP/2 settings -// - `filters`: a map of functions that filter the traffic between components (for debugging or +// - `config.log`: bunyan logger of the parent +// - `config.role`: 'CLIENT' or 'SERVER' +// - `config.settings`: initial HTTP/2 settings +// - `config.filters`: a map of functions that filter the traffic between components (for debugging or // intentional failure injection). // // Filter functions get three arguments: @@ -37,6 +37,11 @@ exports.Endpoint = Endpoint; // // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection // +// * **Event: 'sentFrame' (frame)**: signals a frame has been sent to the remote. Raised from underlying connection. +// +// * **Event: 'receivedFrame' (frame)**: signals a frame has been received to the remote. Raised from +// underlying connection. +// // * **Event: 'error' (type)**: signals an error // // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection) @@ -47,16 +52,25 @@ exports.Endpoint = Endpoint; // ----------- // The process of initialization: -function Endpoint(log, role, settings, filters) { +function Endpoint(config) { Duplex.call(this); + this._config = config || {}; + + // * Handle onEndpoint + if (typeof this._config.onEndpoint === 'function') { + this.addListener('endpoint', this._config.onEndpoint); + } + + this.emit('endpoint', this); + // * Initializing logging infrastructure - this._log = log.child({ component: 'endpoint', e: this }); + this._log = this._config.log.child({component: 'endpoint', e: this}); // * First part of the handshake process: sending and receiving the client connection header // prelude. - assert((role === 'CLIENT') || role === 'SERVER'); - if (role === 'CLIENT') { + assert((this._config.role === 'CLIENT') || this._config.role === 'SERVER'); + if (this._config.role === 'CLIENT') { this._writePrelude(); } else { this._readPrelude(); @@ -65,7 +79,7 @@ function Endpoint(log, role, settings, filters) { // * Initialization of component. This includes the second part of the handshake process: // sending the first SETTINGS frame. This is done by the connection class right after // initialization. - this._initializeDataFlow(role, settings, filters || {}); + this._initializeDataFlow(this._config.role, this._config.settings, this._config.filters || {}); // * Initialization of management code. this._initializeManagement(); @@ -169,6 +183,8 @@ function pipeAndFilter(stream1, stream2, filter) { Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) { var firstStreamId, compressorRole, decompressorRole; + var self = this; + if (role === 'CLIENT') { firstStreamId = 1; compressorRole = 'REQUEST'; @@ -183,7 +199,17 @@ Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, sett this._deserializer = new Deserializer(this._log); this._compressor = new Compressor(this._log, compressorRole); this._decompressor = new Decompressor(this._log, decompressorRole); - this._connection = new Connection(this._log, firstStreamId, settings); + this._connection = new Connection({ + log: this._log, + firstStreamId: firstStreamId, + connectionSettings: settings, + onSentFrame: function (frame) { + self.emit('sentFrame', frame); + }, + onReceivedFrame: function (frame) { + self.emit('receivedFrame', frame); + } + }); pipeAndFilter(this._connection, this._compressor, filters.beforeCompression); pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization); diff --git a/test/connection.js b/test/connection.js index 2c68857f..0002c8fd 100644 --- a/test/connection.js +++ b/test/connection.js @@ -65,7 +65,7 @@ describe('connection.js', function() { describe('invalid operation', function() { describe('unsolicited ping answer', function() { it('should be ignored', function() { - var connection = new Connection(util.log, 1, settings); + var connection = new Connection({log: util.log, firstStreamId: 1, connectionSettings: settings}); connection._receivePing({ stream: 0, @@ -82,8 +82,8 @@ describe('connection.js', function() { describe('test scenario', function() { var c, s; beforeEach(function() { - c = new Connection(util.log.child({ role: 'client' }), 1, settings); - s = new Connection(util.log.child({ role: 'client' }), 2, settings); + c = new Connection({log: util.log.child({ role: 'client' }), firstStreamId: 1, connectionSettings: settings}); + s = new Connection({log: util.log.child({ role: 'client' }), firstStreamId: 2, connectionSettings: settings}); c.pipe(s).pipe(c); }); diff --git a/test/endpoint.js b/test/endpoint.js index bdd2569d..b8a326f1 100644 --- a/test/endpoint.js +++ b/test/endpoint.js @@ -13,8 +13,16 @@ describe('endpoint.js', function() { describe('scenario', function() { describe('connection setup', function() { it('should work as expected', function(done) { - var c = new Endpoint(util.log.child({ role: 'client' }), 'CLIENT', settings); - var s = new Endpoint(util.log.child({ role: 'client' }), 'SERVER', settings); + var c = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'CLIENT', + settings: settings + }); + var s = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'SERVER', + settings: settings + }); util.log.debug('Test initialization over, starting piping.'); c.pipe(s).pipe(c); @@ -30,8 +38,16 @@ describe('endpoint.js', function() { describe('`e`', function() { var format = endpoint.serializers.e; it('should assign a unique ID to each endpoint', function() { - var c = new Endpoint(util.log.child({ role: 'client' }), 'CLIENT', settings); - var s = new Endpoint(util.log.child({ role: 'client' }), 'SERVER', settings); + var c = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'CLIENT', + settings: settings + }); + var s = new Endpoint({ + log: util.log.child({role: 'client'}), + role: 'SERVER', + settings: settings + }); expect(format(c)).to.not.equal(format(s)); expect(format(c)).to.equal(format(c)); expect(format(s)).to.equal(format(s)); diff --git a/test/http.js b/test/http.js index 42bc68e3..ddb4db5c 100644 --- a/test/http.js +++ b/test/http.js @@ -506,6 +506,57 @@ describe('http.js', function() { }); }); }); + describe('raise events from all sent and received frames',function(){ + it('should work as expected', function (done) { + var path = '/x'; + var receiveFrameTypes = []; + var sentFrameTypes = []; + var message = 'Hello world!'; + var server = http2.createServer(options, function (request, response) { + expect(request.url).to.equal(path); + + request.on('data', util.noop); + request.once('end', function () { + response.end(message); + }); + }); + + server.listen(1245, function () { + var request = http2.request({ + protocol: 'https:', + host: 'localhost', + port: 1245, + path: path, + + onEndpoint: function (endpoint) { + endpoint.on('sentFrame', function (frame) { + sentFrameTypes.push(frame.type); + }); + + endpoint.on('receivedFrame', function (frame) { + receiveFrameTypes.push(frame.type); + + }); + } + }); + + request.end(); + request.on('response', function (response) { + + response.once('finish', function () { + expect(sentFrameTypes).to.include('SETTINGS'); + expect(sentFrameTypes).to.include('HEADERS'); + + expect(receiveFrameTypes).to.include('SETTINGS'); + expect(receiveFrameTypes).to.include('HEADERS'); + expect(receiveFrameTypes).to.include('DATA'); + + done(); + }); + }); + }); + }); + }); describe('server push', function() { it('should work as expected', function(done) { var path = '/x';