Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions packages/php-wasm/universal/src/lib/php-request-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from './urls';
import type { PHP, PHPExecutionFailureError } from './php';
import { normalizeHeaders } from './php';
import { PHPResponse } from './php-response';
import { PHPResponse, StreamedPHPResponse } from './php-response';
import type { PHPRequest, PHPRunOptions } from './universal-php';
import { encodeAsMultipart } from './encode-as-multipart';
import type { PHPFactoryOptions } from './php-process-manager';
Expand Down Expand Up @@ -371,7 +371,9 @@ export class PHPRequestHandler implements AsyncDisposable {
*
* @param request - PHP Request data.
*/
async request(request: PHPRequest): Promise<PHPResponse> {
async request(
request: PHPRequest
): Promise<PHPResponse | StreamedPHPResponse> {
const isAbsolute = looksLikeAbsoluteUrl(request.url);
const originalRequestUrl = new URL(
// Remove the hash part of the URL as it's not meant for the server.
Expand Down Expand Up @@ -508,18 +510,23 @@ export class PHPRequestHandler implements AsyncDisposable {
);

/**
* If the response is but the exit code is non-zero, let's rewrite the
* If the response is buffered and the exit code is non-zero, let's rewrite the
* HTTP status code as 500. We're acting as a HTTP server here and
* this behavior is in line with what Nginx and Apache do.
*
* For streamed responses, we can't rewrite the status code since the
* response is already being consumed, so we return it as-is.
*/
if (response.ok() && response.exitCode !== 0) {
return new PHPResponse(
500,
response.headers,
response.bytes,
response.errors,
response.exitCode
);
if (response instanceof PHPResponse) {
if (response.ok() && response.exitCode !== 0) {
return new PHPResponse(
500,
response.headers,
response.bytes,
response.errors,
response.exitCode
);
}
}
return response;
} else {
Expand Down Expand Up @@ -587,7 +594,7 @@ export class PHPRequestHandler implements AsyncDisposable {
originalRequestUrl: URL,
rewrittenRequestUrl: URL,
scriptPath: string
): Promise<PHPResponse> {
): Promise<PHPResponse | StreamedPHPResponse> {
let spawnedPHP: AcquiredPHP | undefined = undefined;
try {
spawnedPHP = await this.instanceManager!.acquirePHPInstance({
Expand Down Expand Up @@ -626,7 +633,7 @@ export class PHPRequestHandler implements AsyncDisposable {
originalRequestUrl: URL,
rewrittenRequestUrl: URL,
scriptPath: string
): Promise<PHPResponse> {
): Promise<PHPResponse | StreamedPHPResponse> {
let preferredMethod: PHPRunOptions['method'] = 'GET';

const headers: Record<string, string> = {
Expand All @@ -646,7 +653,8 @@ export class PHPRequestHandler implements AsyncDisposable {
}

try {
const response = await php.run({
// Use runStream() to keep responses as streams
const streamedResponse = await php.runStream({
relativeUri: ensurePathPrefix(
toRelativeUrl(new URL(rewrittenRequestUrl.toString())),
this.#PATHNAME
Expand All @@ -662,13 +670,14 @@ export class PHPRequestHandler implements AsyncDisposable {
scriptPath,
headers,
});

if (this.#cookieStore) {
this.#cookieStore.rememberCookiesFromResponseHeaders(
response.headers
);
const headers = await streamedResponse.headers;
this.#cookieStore.rememberCookiesFromResponseHeaders(headers);
}

return response;
// Return the stream as-is to avoid buffering large files
return streamedResponse;
} catch (error) {
const executionError = error as PHPExecutionFailureError;
if (executionError?.response) {
Expand Down
22 changes: 21 additions & 1 deletion packages/php-wasm/universal/src/lib/php-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ export class PHPWorker implements LimitedPHPApi, AsyncDisposable {
}

/** @inheritDoc @php-wasm/universal!PHPRequestHandler.request */
async request(request: PHPRequest): Promise<PHPResponse> {
async request(
request: PHPRequest
): Promise<PHPResponse | StreamedPHPResponse> {
const requestHandler = _private.get(this)!.requestHandler!;
return await requestHandler.request(request);
}
Expand All @@ -188,6 +190,24 @@ export class PHPWorker implements LimitedPHPApi, AsyncDisposable {
}
}

/** @inheritDoc @php-wasm/universal!/PHP.runStream */
async runStream(request: PHPRunOptions): Promise<StreamedPHPResponse> {
const { php, reap } = await this.acquirePHPInstance();
let response: StreamedPHPResponse;
try {
response = await php.runStream(request);
} catch (error) {
reap();
throw error;
}
/**
* The StreamedPHPResponse object must be reapable.
* Let's ensure all streams complete before reaping.
*/
response.finished.then(reap, reap);
return response;
}

/** @inheritDoc @php-wasm/universal!/PHP.cli */
async cli(
argv: string[],
Expand Down
4 changes: 3 additions & 1 deletion packages/php-wasm/universal/src/lib/php.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,9 @@ export class PHP implements Disposable {
* Do not use. Use new PHPRequestHandler() instead.
* @deprecated
*/
async request(request: PHPRequest): Promise<PHPResponse> {
async request(
request: PHPRequest
): Promise<PHPResponse | StreamedPHPResponse> {
logger.warn(
'PHP.request() is deprecated. Please use new PHPRequestHandler() instead.'
);
Expand Down
10 changes: 9 additions & 1 deletion packages/playground/blueprints/src/lib/steps/export-wxr.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { UniversalPHP } from '@php-wasm/universal';
import { PHPResponse, StreamedPHPResponse } from '@php-wasm/universal';

/**
* Exports the WordPress database as a WXR file using
Expand All @@ -11,5 +12,12 @@ export async function exportWXR(playground: UniversalPHP) {
const databaseExportResponse = await playground.request({
url: '/wp-admin/export.php?download=true&content=all',
});
return new File([databaseExportResponse.bytes], 'export.xml');

// Handle both buffered and streamed responses
const bytes =
databaseExportResponse instanceof StreamedPHPResponse
? await databaseExportResponse.stdoutBytes
: databaseExportResponse.bytes;

return new File([bytes], 'export.xml');
}
9 changes: 7 additions & 2 deletions packages/playground/cli/src/load-balancer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import type { PHPRequest, PHPResponse, RemoteAPI } from '@php-wasm/universal';
import type {
PHPRequest,
PHPResponse,
StreamedPHPResponse,
RemoteAPI,
} from '@php-wasm/universal';
import type { PlaygroundCliBlueprintV1Worker as PlaygroundCliWorkerV1 } from './blueprints-v1/worker-thread-v1';
import type { PlaygroundCliBlueprintV2Worker as PlaygroundCliWorkerV2 } from './blueprints-v2/worker-thread-v2';

Expand All @@ -11,7 +16,7 @@ type PlaygroundCliWorker = PlaygroundCliWorkerV1 | PlaygroundCliWorkerV2;
// TODO: Could we just spawn a worker using the factory function to PHPProcessManager?
type WorkerLoad = {
worker: RemoteAPI<PlaygroundCliWorker>;
activeRequests: Set<Promise<PHPResponse>>;
activeRequests: Set<Promise<PHPResponse | StreamedPHPResponse>>;
};
export class LoadBalancer {
workerLoads: WorkerLoad[] = [];
Expand Down
46 changes: 39 additions & 7 deletions packages/playground/cli/src/start-server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { type PHPRequest, PHPResponse } from '@php-wasm/universal';
import {
type PHPRequest,
PHPResponse,
StreamedPHPResponse,
} from '@php-wasm/universal';
import type { Request } from 'express';
import express from 'express';
import type { IncomingMessage, Server, ServerResponse } from 'http';
Expand All @@ -9,7 +13,9 @@ import { logger } from '@php-wasm/logger';
export interface ServerOptions {
port: number;
onBind: (server: Server, port: number) => Promise<RunCLIServer | void>;
handleRequest: (request: PHPRequest) => Promise<PHPResponse>;
handleRequest: (
request: PHPRequest
) => Promise<PHPResponse | StreamedPHPResponse>;
}

export async function startServer(
Expand All @@ -31,7 +37,7 @@ export async function startServer(
});

app.use('/', async (req, res) => {
let phpResponse: PHPResponse;
let phpResponse: PHPResponse | StreamedPHPResponse;
try {
phpResponse = await options.handleRequest({
url: req.url,
Expand All @@ -44,11 +50,37 @@ export async function startServer(
phpResponse = PHPResponse.forHttpCode(500);
}

res.statusCode = phpResponse.httpStatusCode;
for (const key in phpResponse.headers) {
res.setHeader(key, phpResponse.headers[key]);
// Handle streamed responses
if (phpResponse instanceof StreamedPHPResponse) {
res.statusCode = await phpResponse.httpStatusCode;
const headers = await phpResponse.headers;
for (const key in headers) {
res.setHeader(key, headers[key]);
}

// Stream the response body
const reader = phpResponse.stdout.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) {
res.write(value);
}
}
res.end();
} catch (error) {
logger.error('Error streaming response:', error);
res.end();
}
} else {
// Handle buffered responses
res.statusCode = phpResponse.httpStatusCode;
for (const key in phpResponse.headers) {
res.setHeader(key, phpResponse.headers[key]);
}
res.end(phpResponse.bytes);
}
res.end(phpResponse.bytes);
});

const address = server.address();
Expand Down
Loading