How to use defaultReaderClosedPromiseInitialize method in wpt

Best JavaScript code snippet using wpt

readable-stream.ts

Source:readable-stream.ts Github

copy

Full Screen

...921 reader._forAuthorCode = true;922 reader._ownerReadableStream = stream;923 stream._reader = reader;924 if (stream._state === 'readable') {925 defaultReaderClosedPromiseInitialize(reader);926 } else if (stream._state === 'closed') {927 defaultReaderClosedPromiseInitializeAsResolved(reader);928 } else {929 assert(stream._state === 'errored');930 defaultReaderClosedPromiseInitializeAsRejected(reader, stream._storedError);931 }932}933// A client of ReadableStreamDefaultReader and ReadableStreamBYOBReader may use these functions directly to bypass state934// check.935function ReadableStreamReaderGenericCancel(reader: ReadableStreamReader<any>, reason: any): Promise<void> {936 const stream = reader._ownerReadableStream;937 assert(stream !== undefined);938 return ReadableStreamCancel(stream, reason);939}940function ReadableStreamReaderGenericRelease(reader: ReadableStreamReader<any>) {941 assert(reader._ownerReadableStream !== undefined);942 assert(reader._ownerReadableStream._reader === reader);943 if (reader._ownerReadableStream._state === 'readable') {944 defaultReaderClosedPromiseReject(945 reader,946 new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));947 } else {948 defaultReaderClosedPromiseResetToRejected(949 reader,950 new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));951 }952 reader._ownerReadableStream._reader = undefined;953 reader._ownerReadableStream = undefined!;954}955function ReadableStreamBYOBReaderRead<T extends ArrayBufferView>(reader: ReadableStreamBYOBReader,956 view: T): Promise<ReadResult<T>> {957 const stream = reader._ownerReadableStream;958 assert(stream !== undefined);959 stream._disturbed = true;960 if (stream._state === 'errored') {961 return Promise.reject(stream._storedError);962 }963 // Controllers must implement this.964 return ReadableByteStreamControllerPullInto(stream._readableStreamController as ReadableByteStreamController,965 view);966}967function ReadableStreamDefaultReaderRead<R>(reader: ReadableStreamDefaultReader<R>): Promise<ReadResult<R>> {968 const stream = reader._ownerReadableStream;969 assert(stream !== undefined);970 stream._disturbed = true;971 if (stream._state === 'closed') {972 return Promise.resolve(ReadableStreamCreateReadResult<R>(undefined, true, reader._forAuthorCode));973 }974 if (stream._state === 'errored') {975 return Promise.reject(stream._storedError);976 }977 assert(stream._state === 'readable');978 return stream._readableStreamController[PullSteps]() as unknown as Promise<ReadResult<R>>;979}980// Controllers981export type ReadableStreamDefaultControllerType<R> = ReadableStreamDefaultController<R>;982class ReadableStreamDefaultController<R> {983 /** @internal */984 _controlledReadableStream!: ReadableStream<R>;985 /** @internal */986 _queue!: SimpleQueue<QueuePair<R>>;987 /** @internal */988 _queueTotalSize!: number;989 /** @internal */990 _started!: boolean;991 /** @internal */992 _closeRequested!: boolean;993 /** @internal */994 _pullAgain!: boolean;995 /** @internal */996 _pulling !: boolean;997 /** @internal */998 _strategySizeAlgorithm!: QueuingStrategySizeCallback<R>;999 /** @internal */1000 _strategyHWM!: number;1001 /** @internal */1002 _pullAlgorithm!: () => Promise<void>;1003 /** @internal */1004 _cancelAlgorithm!: (reason: any) => Promise<void>;1005 /** @internal */1006 constructor() {1007 throw new TypeError();1008 }1009 get desiredSize(): number | null {1010 if (IsReadableStreamDefaultController(this) === false) {1011 throw defaultControllerBrandCheckException('desiredSize');1012 }1013 return ReadableStreamDefaultControllerGetDesiredSize(this);1014 }1015 close(): void {1016 if (IsReadableStreamDefaultController(this) === false) {1017 throw defaultControllerBrandCheckException('close');1018 }1019 if (ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {1020 throw new TypeError('The stream is not in a state that permits close');1021 }1022 ReadableStreamDefaultControllerClose(this);1023 }1024 enqueue(chunk: R): void {1025 if (IsReadableStreamDefaultController(this) === false) {1026 throw defaultControllerBrandCheckException('enqueue');1027 }1028 if (ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {1029 throw new TypeError('The stream is not in a state that permits enqueue');1030 }1031 return ReadableStreamDefaultControllerEnqueue(this, chunk);1032 }1033 error(e: any): void {1034 if (IsReadableStreamDefaultController(this) === false) {1035 throw defaultControllerBrandCheckException('error');1036 }1037 ReadableStreamDefaultControllerError(this, e);1038 }1039 /** @internal */1040 [CancelSteps](reason: any): Promise<void> {1041 ResetQueue(this);1042 const result = this._cancelAlgorithm(reason);1043 ReadableStreamDefaultControllerClearAlgorithms(this);1044 return result;1045 }1046 /** @internal */1047 [PullSteps](): Promise<ReadResult<R>> {1048 const stream = this._controlledReadableStream;1049 if (this._queue.length > 0) {1050 const chunk = DequeueValue(this);1051 if (this._closeRequested === true && this._queue.length === 0) {1052 ReadableStreamDefaultControllerClearAlgorithms(this);1053 ReadableStreamClose(stream);1054 } else {1055 ReadableStreamDefaultControllerCallPullIfNeeded(this);1056 }1057 return Promise.resolve(ReadableStreamCreateReadResult(chunk, false, stream._reader!._forAuthorCode));1058 }1059 const pendingPromise = ReadableStreamAddReadRequest(stream);1060 ReadableStreamDefaultControllerCallPullIfNeeded(this);1061 return pendingPromise;1062 }1063}1064// Abstract operations for the ReadableStreamDefaultController.1065function IsReadableStreamDefaultController<R>(x: any): x is ReadableStreamDefaultController<R> {1066 if (!typeIsObject(x)) {1067 return false;1068 }1069 if (!Object.prototype.hasOwnProperty.call(x, '_controlledReadableStream')) {1070 return false;1071 }1072 return true;1073}1074function ReadableStreamDefaultControllerCallPullIfNeeded(controller: ReadableStreamDefaultController<any>): void {1075 const shouldPull = ReadableStreamDefaultControllerShouldCallPull(controller);1076 if (shouldPull === false) {1077 return;1078 }1079 if (controller._pulling === true) {1080 controller._pullAgain = true;1081 return;1082 }1083 assert(controller._pullAgain === false);1084 controller._pulling = true;1085 const pullPromise = controller._pullAlgorithm();1086 pullPromise.then(1087 () => {1088 controller._pulling = false;1089 if (controller._pullAgain === true) {1090 controller._pullAgain = false;1091 ReadableStreamDefaultControllerCallPullIfNeeded(controller);1092 }1093 },1094 e => {1095 ReadableStreamDefaultControllerError(controller, e);1096 }1097 ).catch(rethrowAssertionErrorRejection);1098}1099function ReadableStreamDefaultControllerShouldCallPull(controller: ReadableStreamDefaultController<any>): boolean {1100 const stream = controller._controlledReadableStream;1101 if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) {1102 return false;1103 }1104 if (controller._started === false) {1105 return false;1106 }1107 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {1108 return true;1109 }1110 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller);1111 assert(desiredSize !== null);1112 if (desiredSize! > 0) {1113 return true;1114 }1115 return false;1116}1117function ReadableStreamDefaultControllerClearAlgorithms(controller: ReadableStreamDefaultController<any>) {1118 controller._pullAlgorithm = undefined!;1119 controller._cancelAlgorithm = undefined!;1120 controller._strategySizeAlgorithm = undefined!;1121}1122// A client of ReadableStreamDefaultController may use these functions directly to bypass state check.1123function ReadableStreamDefaultControllerClose(controller: ReadableStreamDefaultController<any>) {1124 const stream = controller._controlledReadableStream;1125 assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === true);1126 controller._closeRequested = true;1127 if (controller._queue.length === 0) {1128 ReadableStreamDefaultControllerClearAlgorithms(controller);1129 ReadableStreamClose(stream);1130 }1131}1132function ReadableStreamDefaultControllerEnqueue<R>(controller: ReadableStreamDefaultController<R>, chunk: R): void {1133 const stream = controller._controlledReadableStream;1134 assert(ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === true);1135 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {1136 ReadableStreamFulfillReadRequest(stream, chunk, false);1137 } else {1138 let chunkSize;1139 try {1140 chunkSize = controller._strategySizeAlgorithm(chunk);1141 } catch (chunkSizeE) {1142 ReadableStreamDefaultControllerError(controller, chunkSizeE);1143 throw chunkSizeE;1144 }1145 try {1146 EnqueueValueWithSize(controller, chunk, chunkSize);1147 } catch (enqueueE) {1148 ReadableStreamDefaultControllerError(controller, enqueueE);1149 throw enqueueE;1150 }1151 }1152 ReadableStreamDefaultControllerCallPullIfNeeded(controller);1153}1154function ReadableStreamDefaultControllerError(controller: ReadableStreamDefaultController<any>, e: any) {1155 const stream = controller._controlledReadableStream;1156 if (stream._state !== 'readable') {1157 return;1158 }1159 ResetQueue(controller);1160 ReadableStreamDefaultControllerClearAlgorithms(controller);1161 ReadableStreamError(stream, e);1162}1163function ReadableStreamDefaultControllerGetDesiredSize(controller: ReadableStreamDefaultController<any>): number | null {1164 const stream = controller._controlledReadableStream;1165 const state = stream._state;1166 if (state === 'errored') {1167 return null;1168 }1169 if (state === 'closed') {1170 return 0;1171 }1172 return controller._strategyHWM - controller._queueTotalSize;1173}1174// This is used in the implementation of TransformStream.1175function ReadableStreamDefaultControllerHasBackpressure(controller: ReadableStreamDefaultController<any>): boolean {1176 if (ReadableStreamDefaultControllerShouldCallPull(controller) === true) {1177 return false;1178 }1179 return true;1180}1181function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller: ReadableStreamDefaultController<any>): boolean {1182 const state = controller._controlledReadableStream._state;1183 if (controller._closeRequested === false && state === 'readable') {1184 return true;1185 }1186 return false;1187}1188function SetUpReadableStreamDefaultController<R>(stream: ReadableStream<R>,1189 controller: ReadableStreamDefaultController<R>,1190 startAlgorithm: () => void | PromiseLike<void>,1191 pullAlgorithm: () => Promise<void>,1192 cancelAlgorithm: (reason: any) => Promise<void>,1193 highWaterMark: number,1194 sizeAlgorithm: QueuingStrategySizeCallback<R>) {1195 assert(stream._readableStreamController === undefined);1196 controller._controlledReadableStream = stream;1197 controller._queue = undefined!;1198 controller._queueTotalSize = undefined!;1199 ResetQueue(controller);1200 controller._started = false;1201 controller._closeRequested = false;1202 controller._pullAgain = false;1203 controller._pulling = false;1204 controller._strategySizeAlgorithm = sizeAlgorithm;1205 controller._strategyHWM = highWaterMark;1206 controller._pullAlgorithm = pullAlgorithm;1207 controller._cancelAlgorithm = cancelAlgorithm;1208 stream._readableStreamController = controller;1209 const startResult = startAlgorithm();1210 Promise.resolve(startResult).then(1211 () => {1212 controller._started = true;1213 assert(controller._pulling === false);1214 assert(controller._pullAgain === false);1215 ReadableStreamDefaultControllerCallPullIfNeeded(controller);1216 },1217 r => {1218 ReadableStreamDefaultControllerError(controller, r);1219 }1220 ).catch(rethrowAssertionErrorRejection);1221}1222function SetUpReadableStreamDefaultControllerFromUnderlyingSource<R>(stream: ReadableStream<R>,1223 underlyingSource: UnderlyingSource<R>,1224 highWaterMark: number,1225 sizeAlgorithm: QueuingStrategySizeCallback<R>) {1226 assert(underlyingSource !== undefined);1227 const controller: ReadableStreamDefaultController<R> = Object.create(ReadableStreamDefaultController.prototype);1228 function startAlgorithm() {1229 return InvokeOrNoop<typeof underlyingSource, 'start'>(underlyingSource, 'start', [controller]);1230 }1231 const pullAlgorithm = CreateAlgorithmFromUnderlyingMethod<typeof underlyingSource, 'pull'>(1232 underlyingSource, 'pull', 0, [controller]1233 );1234 const cancelAlgorithm = CreateAlgorithmFromUnderlyingMethod<typeof underlyingSource, 'cancel'>(1235 underlyingSource, 'cancel', 1, []1236 );1237 SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm,1238 highWaterMark, sizeAlgorithm);1239}1240export type ReadableStreamBYOBRequestType = ReadableStreamBYOBRequest;1241class ReadableStreamBYOBRequest {1242 /** @internal */1243 _associatedReadableByteStreamController!: ReadableByteStreamController;1244 /** @internal */1245 _view!: ArrayBufferView;1246 /** @internal */1247 constructor() {1248 throw new TypeError('ReadableStreamBYOBRequest cannot be used directly');1249 }1250 get view(): ArrayBufferView {1251 if (IsReadableStreamBYOBRequest(this) === false) {1252 throw byobRequestBrandCheckException('view');1253 }1254 return this._view;1255 }1256 respond(bytesWritten: number): void {1257 if (IsReadableStreamBYOBRequest(this) === false) {1258 throw byobRequestBrandCheckException('respond');1259 }1260 if (this._associatedReadableByteStreamController === undefined) {1261 throw new TypeError('This BYOB request has been invalidated');1262 }1263 if (IsDetachedBuffer(this._view.buffer) === true) {1264 throw new TypeError('The BYOB request\'s buffer has been detached and so cannot be used as a response');1265 }1266 ReadableByteStreamControllerRespond(this._associatedReadableByteStreamController, bytesWritten);1267 }1268 respondWithNewView(view: ArrayBufferView): void {1269 if (IsReadableStreamBYOBRequest(this) === false) {1270 throw byobRequestBrandCheckException('respond');1271 }1272 if (this._associatedReadableByteStreamController === undefined) {1273 throw new TypeError('This BYOB request has been invalidated');1274 }1275 if (!ArrayBuffer.isView(view)) {1276 throw new TypeError('You can only respond with array buffer views');1277 }1278 if (IsDetachedBuffer(view.buffer) === true) {1279 throw new TypeError('The supplied view\'s buffer has been detached and so cannot be used as a response');1280 }1281 ReadableByteStreamControllerRespondWithNewView(this._associatedReadableByteStreamController, view);1282 }1283}1284interface ArrayBufferViewConstructor<T extends ArrayBufferView = ArrayBufferView> {1285 new(buffer: ArrayBufferLike, byteOffset: number, length?: number): T;1286 readonly prototype: T;1287 readonly BYTES_PER_ELEMENT: number;1288}1289interface ByteQueueElement {1290 buffer: ArrayBufferLike;1291 byteOffset: number;1292 byteLength: number;1293}1294type PullIntoDescriptor<T extends ArrayBufferView = ArrayBufferView> =1295 DefaultPullIntoDescriptor1296 | BYOBPullIntoDescriptor;1297interface DefaultPullIntoDescriptor {1298 buffer: ArrayBufferLike;1299 byteOffset: number;1300 byteLength: number;1301 bytesFilled: number;1302 elementSize: number;1303 ctor: ArrayBufferViewConstructor<Uint8Array>;1304 readerType: 'default';1305}1306interface BYOBPullIntoDescriptor<T extends ArrayBufferView = ArrayBufferView> {1307 buffer: ArrayBufferLike;1308 byteOffset: number;1309 byteLength: number;1310 bytesFilled: number;1311 elementSize: number;1312 ctor: ArrayBufferViewConstructor<T>;1313 readerType: 'byob';1314}1315export type ReadableByteStreamControllerType = ReadableByteStreamController;1316class ReadableByteStreamController {1317 /** @internal */1318 _controlledReadableByteStream!: ReadableByteStream;1319 /** @internal */1320 _queue!: SimpleQueue<ByteQueueElement>;1321 /** @internal */1322 _queueTotalSize!: number;1323 /** @internal */1324 _started!: boolean;1325 /** @internal */1326 _closeRequested!: boolean;1327 /** @internal */1328 _pullAgain!: boolean;1329 /** @internal */1330 _pulling !: boolean;1331 /** @internal */1332 _strategyHWM!: number;1333 /** @internal */1334 _pullAlgorithm!: () => Promise<void>;1335 /** @internal */1336 _cancelAlgorithm!: (reason: any) => Promise<void>;1337 /** @internal */1338 _autoAllocateChunkSize: number | undefined;1339 /** @internal */1340 _byobRequest: ReadableStreamBYOBRequest | undefined;1341 /** @internal */1342 _pendingPullIntos!: SimpleQueue<PullIntoDescriptor>;1343 /** @internal */1344 constructor() {1345 throw new TypeError('ReadableByteStreamController constructor cannot be used directly');1346 }1347 get byobRequest(): ReadableStreamBYOBRequest | undefined {1348 if (IsReadableByteStreamController(this) === false) {1349 throw byteStreamControllerBrandCheckException('byobRequest');1350 }1351 if (this._byobRequest === undefined && this._pendingPullIntos.length > 0) {1352 const firstDescriptor = this._pendingPullIntos.peek();1353 const view = new Uint8Array(firstDescriptor.buffer,1354 firstDescriptor.byteOffset + firstDescriptor.bytesFilled,1355 firstDescriptor.byteLength - firstDescriptor.bytesFilled);1356 const byobRequest: ReadableStreamBYOBRequest = Object.create(ReadableStreamBYOBRequest.prototype);1357 SetUpReadableStreamBYOBRequest(byobRequest, this, view);1358 this._byobRequest = byobRequest;1359 }1360 return this._byobRequest;1361 }1362 get desiredSize(): number | null {1363 if (IsReadableByteStreamController(this) === false) {1364 throw byteStreamControllerBrandCheckException('desiredSize');1365 }1366 return ReadableByteStreamControllerGetDesiredSize(this);1367 }1368 close(): void {1369 if (IsReadableByteStreamController(this) === false) {1370 throw byteStreamControllerBrandCheckException('close');1371 }1372 if (this._closeRequested === true) {1373 throw new TypeError('The stream has already been closed; do not close it again!');1374 }1375 const state = this._controlledReadableByteStream._state;1376 if (state !== 'readable') {1377 throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be closed`);1378 }1379 ReadableByteStreamControllerClose(this);1380 }1381 enqueue(chunk: ArrayBufferView): void {1382 if (IsReadableByteStreamController(this) === false) {1383 throw byteStreamControllerBrandCheckException('enqueue');1384 }1385 if (this._closeRequested === true) {1386 throw new TypeError('stream is closed or draining');1387 }1388 const state = this._controlledReadableByteStream._state;1389 if (state !== 'readable') {1390 throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be enqueued to`);1391 }1392 if (!ArrayBuffer.isView(chunk)) {1393 throw new TypeError('You can only enqueue array buffer views when using a ReadableByteStreamController');1394 }1395 if (IsDetachedBuffer(chunk.buffer) === true) {1396 throw new TypeError('Cannot enqueue a view onto a detached ArrayBuffer');1397 }1398 ReadableByteStreamControllerEnqueue(this, chunk);1399 }1400 error(e: any): void {1401 if (IsReadableByteStreamController(this) === false) {1402 throw byteStreamControllerBrandCheckException('error');1403 }1404 ReadableByteStreamControllerError(this, e);1405 }1406 /** @internal */1407 [CancelSteps](reason: any): Promise<void> {1408 if (this._pendingPullIntos.length > 0) {1409 const firstDescriptor = this._pendingPullIntos.peek();1410 firstDescriptor.bytesFilled = 0;1411 }1412 ResetQueue(this);1413 const result = this._cancelAlgorithm(reason);1414 ReadableByteStreamControllerClearAlgorithms(this);1415 return result;1416 }1417 /** @internal */1418 [PullSteps](): Promise<ReadResult<ArrayBufferView>> {1419 const stream = this._controlledReadableByteStream;1420 assert(ReadableStreamHasDefaultReader(stream) === true);1421 if (this._queueTotalSize > 0) {1422 assert(ReadableStreamGetNumReadRequests(stream) === 0);1423 const entry = this._queue.shift()!;1424 this._queueTotalSize -= entry.byteLength;1425 ReadableByteStreamControllerHandleQueueDrain(this);1426 let view: ArrayBufferView;1427 try {1428 view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);1429 } catch (viewE) {1430 return Promise.reject(viewE);1431 }1432 return Promise.resolve(ReadableStreamCreateReadResult(view, false, stream._reader!._forAuthorCode));1433 }1434 const autoAllocateChunkSize = this._autoAllocateChunkSize;1435 if (autoAllocateChunkSize !== undefined) {1436 let buffer: ArrayBuffer;1437 try {1438 buffer = new ArrayBuffer(autoAllocateChunkSize);1439 } catch (bufferE) {1440 return Promise.reject(bufferE);1441 }1442 const pullIntoDescriptor: DefaultPullIntoDescriptor = {1443 buffer,1444 byteOffset: 0,1445 byteLength: autoAllocateChunkSize,1446 bytesFilled: 0,1447 elementSize: 1,1448 ctor: Uint8Array,1449 readerType: 'default'1450 };1451 this._pendingPullIntos.push(pullIntoDescriptor);1452 }1453 const promise = ReadableStreamAddReadRequest(stream);1454 ReadableByteStreamControllerCallPullIfNeeded(this);1455 return promise;1456 }1457}1458// Abstract operations for the ReadableByteStreamController.1459function IsReadableByteStreamController(x: any): x is ReadableByteStreamController {1460 if (!typeIsObject(x)) {1461 return false;1462 }1463 if (!Object.prototype.hasOwnProperty.call(x, '_controlledReadableByteStream')) {1464 return false;1465 }1466 return true;1467}1468function IsReadableStreamBYOBRequest(x: any): x is ReadableStreamBYOBRequest {1469 if (!typeIsObject(x)) {1470 return false;1471 }1472 if (!Object.prototype.hasOwnProperty.call(x, '_associatedReadableByteStreamController')) {1473 return false;1474 }1475 return true;1476}1477function ReadableByteStreamControllerCallPullIfNeeded(controller: ReadableByteStreamController): void {1478 const shouldPull = ReadableByteStreamControllerShouldCallPull(controller);1479 if (shouldPull === false) {1480 return;1481 }1482 if (controller._pulling === true) {1483 controller._pullAgain = true;1484 return;1485 }1486 assert(controller._pullAgain === false);1487 controller._pulling = true;1488 // TODO: Test controller argument1489 const pullPromise = controller._pullAlgorithm();1490 pullPromise.then(1491 () => {1492 controller._pulling = false;1493 if (controller._pullAgain === true) {1494 controller._pullAgain = false;1495 ReadableByteStreamControllerCallPullIfNeeded(controller);1496 }1497 },1498 e => {1499 ReadableByteStreamControllerError(controller, e);1500 }1501 ).catch(rethrowAssertionErrorRejection);1502}1503function ReadableByteStreamControllerClearPendingPullIntos(controller: ReadableByteStreamController) {1504 ReadableByteStreamControllerInvalidateBYOBRequest(controller);1505 controller._pendingPullIntos = new SimpleQueue();1506}1507function ReadableByteStreamControllerCommitPullIntoDescriptor<T extends ArrayBufferView>(stream: ReadableByteStream,1508 pullIntoDescriptor: PullIntoDescriptor<T>) {1509 assert(stream._state !== 'errored');1510 let done = false;1511 if (stream._state === 'closed') {1512 assert(pullIntoDescriptor.bytesFilled === 0);1513 done = true;1514 }1515 const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor<T>(pullIntoDescriptor);1516 if (pullIntoDescriptor.readerType === 'default') {1517 ReadableStreamFulfillReadRequest(stream, filledView as unknown as Uint8Array, done);1518 } else {1519 assert(pullIntoDescriptor.readerType === 'byob');1520 ReadableStreamFulfillReadIntoRequest(stream, filledView, done);1521 }1522}1523function ReadableByteStreamControllerConvertPullIntoDescriptor<T extends ArrayBufferView>(pullIntoDescriptor: PullIntoDescriptor<T>): T {1524 const bytesFilled = pullIntoDescriptor.bytesFilled;1525 const elementSize = pullIntoDescriptor.elementSize;1526 assert(bytesFilled <= pullIntoDescriptor.byteLength);1527 assert(bytesFilled % elementSize === 0);1528 return new pullIntoDescriptor.ctor(1529 pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize) as T;1530}1531function ReadableByteStreamControllerEnqueueChunkToQueue(controller: ReadableByteStreamController,1532 buffer: ArrayBufferLike,1533 byteOffset: number,1534 byteLength: number) {1535 controller._queue.push({ buffer, byteOffset, byteLength });1536 controller._queueTotalSize += byteLength;1537}1538function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller: ReadableByteStreamController,1539 pullIntoDescriptor: PullIntoDescriptor) {1540 const elementSize = pullIntoDescriptor.elementSize;1541 const currentAlignedBytes = pullIntoDescriptor.bytesFilled - pullIntoDescriptor.bytesFilled % elementSize;1542 const maxBytesToCopy = Math.min(controller._queueTotalSize,1543 pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled);1544 const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;1545 const maxAlignedBytes = maxBytesFilled - maxBytesFilled % elementSize;1546 let totalBytesToCopyRemaining = maxBytesToCopy;1547 let ready = false;1548 if (maxAlignedBytes > currentAlignedBytes) {1549 totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;1550 ready = true;1551 }1552 const queue = controller._queue;1553 while (totalBytesToCopyRemaining > 0) {1554 const headOfQueue = queue.peek();1555 const bytesToCopy = Math.min(totalBytesToCopyRemaining, headOfQueue.byteLength);1556 const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;1557 ArrayBufferCopy(pullIntoDescriptor.buffer, destStart, headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy);1558 if (headOfQueue.byteLength === bytesToCopy) {1559 queue.shift();1560 } else {1561 headOfQueue.byteOffset += bytesToCopy;1562 headOfQueue.byteLength -= bytesToCopy;1563 }1564 controller._queueTotalSize -= bytesToCopy;1565 ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor);1566 totalBytesToCopyRemaining -= bytesToCopy;1567 }1568 if (ready === false) {1569 assert(controller._queueTotalSize === 0);1570 assert(pullIntoDescriptor.bytesFilled > 0);1571 assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);1572 }1573 return ready;1574}1575function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller: ReadableByteStreamController,1576 size: number,1577 pullIntoDescriptor: PullIntoDescriptor) {1578 assert(controller._pendingPullIntos.length === 0 || controller._pendingPullIntos.peek() === pullIntoDescriptor);1579 ReadableByteStreamControllerInvalidateBYOBRequest(controller);1580 pullIntoDescriptor.bytesFilled += size;1581}1582function ReadableByteStreamControllerHandleQueueDrain(controller: ReadableByteStreamController) {1583 assert(controller._controlledReadableByteStream._state === 'readable');1584 if (controller._queueTotalSize === 0 && controller._closeRequested === true) {1585 ReadableByteStreamControllerClearAlgorithms(controller);1586 ReadableStreamClose(controller._controlledReadableByteStream);1587 } else {1588 ReadableByteStreamControllerCallPullIfNeeded(controller);1589 }1590}1591function ReadableByteStreamControllerInvalidateBYOBRequest(controller: ReadableByteStreamController) {1592 if (controller._byobRequest === undefined) {1593 return;1594 }1595 controller._byobRequest._associatedReadableByteStreamController = undefined!;1596 controller._byobRequest._view = undefined!;1597 controller._byobRequest = undefined;1598}1599function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller: ReadableByteStreamController) {1600 assert(controller._closeRequested === false);1601 while (controller._pendingPullIntos.length > 0) {1602 if (controller._queueTotalSize === 0) {1603 return;1604 }1605 const pullIntoDescriptor = controller._pendingPullIntos.peek();1606 if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) {1607 ReadableByteStreamControllerShiftPendingPullInto(controller);1608 ReadableByteStreamControllerCommitPullIntoDescriptor(1609 controller._controlledReadableByteStream,1610 pullIntoDescriptor1611 );1612 }1613 }1614}1615function ReadableByteStreamControllerPullInto<T extends ArrayBufferView>(controller: ReadableByteStreamController,1616 view: T): Promise<ReadResult<T>> {1617 const stream = controller._controlledReadableByteStream;1618 let elementSize = 1;1619 if (view.constructor !== DataView) {1620 elementSize = (view.constructor as ArrayBufferViewConstructor<T>).BYTES_PER_ELEMENT;1621 }1622 const ctor = view.constructor as ArrayBufferViewConstructor<T>;1623 const buffer = TransferArrayBuffer(view.buffer);1624 const pullIntoDescriptor: BYOBPullIntoDescriptor<T> = {1625 buffer,1626 byteOffset: view.byteOffset,1627 byteLength: view.byteLength,1628 bytesFilled: 0,1629 elementSize,1630 ctor,1631 readerType: 'byob'1632 };1633 if (controller._pendingPullIntos.length > 0) {1634 controller._pendingPullIntos.push(pullIntoDescriptor);1635 // No ReadableByteStreamControllerCallPullIfNeeded() call since:1636 // - No change happens on desiredSize1637 // - The source has already been notified of that there's at least 1 pending read(view)1638 return ReadableStreamAddReadIntoRequest(stream);1639 }1640 if (stream._state === 'closed') {1641 const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0);1642 return Promise.resolve(ReadableStreamCreateReadResult(emptyView, true, stream._reader!._forAuthorCode));1643 }1644 if (controller._queueTotalSize > 0) {1645 if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) {1646 const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor<T>(pullIntoDescriptor);1647 ReadableByteStreamControllerHandleQueueDrain(controller);1648 return Promise.resolve(ReadableStreamCreateReadResult(filledView, false, stream._reader!._forAuthorCode));1649 }1650 if (controller._closeRequested === true) {1651 const e = new TypeError('Insufficient bytes to fill elements in the given buffer');1652 ReadableByteStreamControllerError(controller, e);1653 return Promise.reject(e);1654 }1655 }1656 controller._pendingPullIntos.push(pullIntoDescriptor);1657 const promise = ReadableStreamAddReadIntoRequest<T>(stream);1658 ReadableByteStreamControllerCallPullIfNeeded(controller);1659 return promise;1660}1661function ReadableByteStreamControllerRespondInClosedState(controller: ReadableByteStreamController,1662 firstDescriptor: PullIntoDescriptor) {1663 firstDescriptor.buffer = TransferArrayBuffer(firstDescriptor.buffer);1664 assert(firstDescriptor.bytesFilled === 0);1665 const stream = controller._controlledReadableByteStream;1666 if (ReadableStreamHasBYOBReader(stream) === true) {1667 while (ReadableStreamGetNumReadIntoRequests(stream) > 0) {1668 const pullIntoDescriptor = ReadableByteStreamControllerShiftPendingPullInto(controller);1669 ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor);1670 }1671 }1672}1673function ReadableByteStreamControllerRespondInReadableState(controller: ReadableByteStreamController,1674 bytesWritten: number,1675 pullIntoDescriptor: PullIntoDescriptor) {1676 if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) {1677 throw new RangeError('bytesWritten out of range');1678 }1679 ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor);1680 if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {1681 // TODO: Figure out whether we should detach the buffer or not here.1682 return;1683 }1684 ReadableByteStreamControllerShiftPendingPullInto(controller);1685 const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;1686 if (remainderSize > 0) {1687 const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;1688 const remainder = pullIntoDescriptor.buffer.slice(end - remainderSize, end);1689 ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength);1690 }1691 pullIntoDescriptor.buffer = TransferArrayBuffer(pullIntoDescriptor.buffer);1692 pullIntoDescriptor.bytesFilled -= remainderSize;1693 ReadableByteStreamControllerCommitPullIntoDescriptor(controller._controlledReadableByteStream, pullIntoDescriptor);1694 ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);1695}1696function ReadableByteStreamControllerRespondInternal(controller: ReadableByteStreamController, bytesWritten: number) {1697 const firstDescriptor = controller._pendingPullIntos.peek();1698 const stream = controller._controlledReadableByteStream;1699 if (stream._state === 'closed') {1700 if (bytesWritten !== 0) {1701 throw new TypeError('bytesWritten must be 0 when calling respond() on a closed stream');1702 }1703 ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor);1704 } else {1705 assert(stream._state === 'readable');1706 ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);1707 }1708 ReadableByteStreamControllerCallPullIfNeeded(controller);1709}1710function ReadableByteStreamControllerShiftPendingPullInto(controller: ReadableByteStreamController): PullIntoDescriptor {1711 const descriptor = controller._pendingPullIntos.shift()!;1712 ReadableByteStreamControllerInvalidateBYOBRequest(controller);1713 return descriptor;1714}1715function ReadableByteStreamControllerShouldCallPull(controller: ReadableByteStreamController): boolean {1716 const stream = controller._controlledReadableByteStream;1717 if (stream._state !== 'readable') {1718 return false;1719 }1720 if (controller._closeRequested === true) {1721 return false;1722 }1723 if (controller._started === false) {1724 return false;1725 }1726 if (ReadableStreamHasDefaultReader(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {1727 return true;1728 }1729 if (ReadableStreamHasBYOBReader(stream) === true && ReadableStreamGetNumReadIntoRequests(stream) > 0) {1730 return true;1731 }1732 const desiredSize = ReadableByteStreamControllerGetDesiredSize(controller);1733 assert(desiredSize !== null);1734 if (desiredSize! > 0) {1735 return true;1736 }1737 return false;1738}1739function ReadableByteStreamControllerClearAlgorithms(controller: ReadableByteStreamController) {1740 controller._pullAlgorithm = undefined!;1741 controller._cancelAlgorithm = undefined!;1742}1743// A client of ReadableByteStreamController may use these functions directly to bypass state check.1744function ReadableByteStreamControllerClose(controller: ReadableByteStreamController) {1745 const stream = controller._controlledReadableByteStream;1746 assert(controller._closeRequested === false);1747 assert(stream._state === 'readable');1748 if (controller._queueTotalSize > 0) {1749 controller._closeRequested = true;1750 return;1751 }1752 if (controller._pendingPullIntos.length > 0) {1753 const firstPendingPullInto = controller._pendingPullIntos.peek();1754 if (firstPendingPullInto.bytesFilled > 0) {1755 const e = new TypeError('Insufficient bytes to fill elements in the given buffer');1756 ReadableByteStreamControllerError(controller, e);1757 throw e;1758 }1759 }1760 ReadableByteStreamControllerClearAlgorithms(controller);1761 ReadableStreamClose(stream);1762}1763function ReadableByteStreamControllerEnqueue(controller: ReadableByteStreamController, chunk: ArrayBufferView) {1764 const stream = controller._controlledReadableByteStream;1765 assert(controller._closeRequested === false);1766 assert(stream._state === 'readable');1767 const buffer = chunk.buffer;1768 const byteOffset = chunk.byteOffset;1769 const byteLength = chunk.byteLength;1770 const transferredBuffer = TransferArrayBuffer(buffer);1771 if (ReadableStreamHasDefaultReader(stream) === true) {1772 if (ReadableStreamGetNumReadRequests(stream) === 0) {1773 ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);1774 } else {1775 assert(controller._queue.length === 0);1776 const transferredView = new Uint8Array(transferredBuffer, byteOffset, byteLength);1777 ReadableStreamFulfillReadRequest(stream, transferredView, false);1778 }1779 } else if (ReadableStreamHasBYOBReader(stream) === true) {1780 // TODO: Ideally in this branch detaching should happen only if the buffer is not consumed fully.1781 ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);1782 ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);1783 } else {1784 assert(IsReadableStreamLocked(stream) === false);1785 ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);1786 }1787 ReadableByteStreamControllerCallPullIfNeeded(controller);1788}1789function ReadableByteStreamControllerError(controller: ReadableByteStreamController, e: any) {1790 const stream = controller._controlledReadableByteStream;1791 if (stream._state !== 'readable') {1792 return;1793 }1794 ReadableByteStreamControllerClearPendingPullIntos(controller);1795 ResetQueue(controller);1796 ReadableByteStreamControllerClearAlgorithms(controller);1797 ReadableStreamError(stream, e);1798}1799function ReadableByteStreamControllerGetDesiredSize(controller: ReadableByteStreamController): number | null {1800 const stream = controller._controlledReadableByteStream;1801 const state = stream._state;1802 if (state === 'errored') {1803 return null;1804 }1805 if (state === 'closed') {1806 return 0;1807 }1808 return controller._strategyHWM - controller._queueTotalSize;1809}1810function ReadableByteStreamControllerRespond(controller: ReadableByteStreamController, bytesWritten: number) {1811 bytesWritten = Number(bytesWritten);1812 if (IsFiniteNonNegativeNumber(bytesWritten) === false) {1813 throw new RangeError('bytesWritten must be a finite');1814 }1815 assert(controller._pendingPullIntos.length > 0);1816 ReadableByteStreamControllerRespondInternal(controller, bytesWritten);1817}1818function ReadableByteStreamControllerRespondWithNewView(controller: ReadableByteStreamController,1819 view: ArrayBufferView) {1820 assert(controller._pendingPullIntos.length > 0);1821 const firstDescriptor = controller._pendingPullIntos.peek();1822 if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) {1823 throw new RangeError('The region specified by view does not match byobRequest');1824 }1825 if (firstDescriptor.byteLength !== view.byteLength) {1826 throw new RangeError('The buffer of view has different capacity than byobRequest');1827 }1828 firstDescriptor.buffer = view.buffer;1829 ReadableByteStreamControllerRespondInternal(controller, view.byteLength);1830}1831function SetUpReadableByteStreamController(stream: ReadableByteStream,1832 controller: ReadableByteStreamController,1833 startAlgorithm: () => void | PromiseLike<void>,1834 pullAlgorithm: () => Promise<void>,1835 cancelAlgorithm: (reason: any) => Promise<void>,1836 highWaterMark: number,1837 autoAllocateChunkSize: number | undefined) {1838 assert(stream._readableStreamController === undefined);1839 if (autoAllocateChunkSize !== undefined) {1840 assert(NumberIsInteger(autoAllocateChunkSize) === true);1841 assert(autoAllocateChunkSize > 0);1842 }1843 controller._controlledReadableByteStream = stream;1844 controller._pullAgain = false;1845 controller._pulling = false;1846 controller._byobRequest = undefined;1847 // Need to set the slots so that the assert doesn't fire. In the spec the slots already exist implicitly.1848 controller._queue = controller._queueTotalSize = undefined!;1849 ResetQueue(controller);1850 controller._closeRequested = false;1851 controller._started = false;1852 controller._strategyHWM = ValidateAndNormalizeHighWaterMark(highWaterMark);1853 controller._pullAlgorithm = pullAlgorithm;1854 controller._cancelAlgorithm = cancelAlgorithm;1855 controller._autoAllocateChunkSize = autoAllocateChunkSize;1856 controller._pendingPullIntos = new SimpleQueue();1857 stream._readableStreamController = controller;1858 const startResult = startAlgorithm();1859 Promise.resolve(startResult).then(1860 () => {1861 controller._started = true;1862 assert(controller._pulling === false);1863 assert(controller._pullAgain === false);1864 ReadableByteStreamControllerCallPullIfNeeded(controller);1865 },1866 r => {1867 ReadableByteStreamControllerError(controller, r);1868 }1869 ).catch(rethrowAssertionErrorRejection);1870}1871function SetUpReadableByteStreamControllerFromUnderlyingSource(stream: ReadableByteStream,1872 underlyingByteSource: UnderlyingByteSource,1873 highWaterMark: number) {1874 assert(underlyingByteSource !== undefined);1875 const controller: ReadableByteStreamController = Object.create(ReadableByteStreamController.prototype);1876 function startAlgorithm() {1877 return InvokeOrNoop<typeof underlyingByteSource, 'start'>(underlyingByteSource, 'start', [controller]);1878 }1879 const pullAlgorithm = CreateAlgorithmFromUnderlyingMethod<typeof underlyingByteSource, 'pull'>(1880 underlyingByteSource, 'pull', 0, [controller]1881 );1882 const cancelAlgorithm = CreateAlgorithmFromUnderlyingMethod<typeof underlyingByteSource, 'cancel'>(1883 underlyingByteSource, 'cancel', 1, []1884 );1885 let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;1886 if (autoAllocateChunkSize !== undefined) {1887 autoAllocateChunkSize = Number(autoAllocateChunkSize);1888 if (NumberIsInteger(autoAllocateChunkSize) === false || autoAllocateChunkSize <= 0) {1889 throw new RangeError('autoAllocateChunkSize must be a positive integer');1890 }1891 }1892 SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark,1893 autoAllocateChunkSize);1894}1895function SetUpReadableStreamBYOBRequest(request: ReadableStreamBYOBRequest,1896 controller: ReadableByteStreamController,1897 view: ArrayBufferView) {1898 assert(IsReadableByteStreamController(controller) === true);1899 assert(typeof view === 'object');1900 assert(ArrayBuffer.isView(view) === true);1901 assert(IsDetachedBuffer(view.buffer) === false);1902 request._associatedReadableByteStreamController = controller;1903 request._view = view;1904}1905// Helper functions for the ReadableStream.1906function isAbortSignal(value: any): value is AbortSignal {1907 if (typeof value !== 'object' || value === null) {1908 return false;1909 }1910 // Use the brand check to distinguish a real AbortSignal from a fake one.1911 const aborted = Object.getOwnPropertyDescriptor(AbortSignal.prototype, 'aborted')!.get!;1912 try {1913 aborted.call(value);1914 return true;1915 } catch (e) {1916 return false;1917 }1918}1919function streamBrandCheckException(name: string): TypeError {1920 return new TypeError(`ReadableStream.prototype.${name} can only be used on a ReadableStream`);1921}1922function streamAsyncIteratorBrandCheckException(name: string): TypeError {1923 return new TypeError(`ReadableStreamAsyncIterator.${name} can only be used on a ReadableSteamAsyncIterator`);1924}1925// Helper functions for the readers.1926function readerLockException(name: string): TypeError {1927 return new TypeError('Cannot ' + name + ' a stream using a released reader');1928}1929// Helper functions for the ReadableStreamDefaultReader.1930function defaultReaderBrandCheckException(name: string): TypeError {1931 return new TypeError(1932 `ReadableStreamDefaultReader.prototype.${name} can only be used on a ReadableStreamDefaultReader`);1933}1934function defaultReaderClosedPromiseInitialize(reader: ReadableStreamReader<any>) {1935 reader._closedPromise = new Promise((resolve, reject) => {1936 reader._closedPromise_resolve = resolve;1937 reader._closedPromise_reject = reject;1938 });1939}1940function defaultReaderClosedPromiseInitializeAsRejected(reader: ReadableStreamReader<any>, reason: any) {1941 defaultReaderClosedPromiseInitialize(reader);1942 defaultReaderClosedPromiseReject(reader, reason);1943}1944function defaultReaderClosedPromiseInitializeAsResolved(reader: ReadableStreamReader<any>) {1945 defaultReaderClosedPromiseInitialize(reader);1946 defaultReaderClosedPromiseResolve(reader);1947}1948function defaultReaderClosedPromiseReject(reader: ReadableStreamReader<any>, reason: any) {1949 assert(reader._closedPromise_resolve !== undefined);1950 assert(reader._closedPromise_reject !== undefined);1951 reader._closedPromise.catch(() => {});1952 reader._closedPromise_reject!(reason);1953 reader._closedPromise_resolve = undefined;1954 reader._closedPromise_reject = undefined;1955}1956function defaultReaderClosedPromiseResetToRejected(reader: ReadableStreamReader<any>, reason: any) {1957 assert(reader._closedPromise_resolve === undefined);1958 assert(reader._closedPromise_reject === undefined);1959 defaultReaderClosedPromiseInitializeAsRejected(reader, reason);...

Full Screen

Full Screen

readable-stream.js

Source:readable-stream.js Github

copy

Full Screen

...600function ReadableStreamReaderGenericInitialize(reader, stream) {601 reader._ownerReadableStream = stream;602 stream._reader = reader;603 if (stream._state === 'readable') {604 defaultReaderClosedPromiseInitialize(reader);605 } else if (stream._state === 'closed') {606 defaultReaderClosedPromiseInitializeAsResolved(reader);607 } else {608 assert(stream._state === 'errored', 'state must be errored');609 defaultReaderClosedPromiseInitializeAsRejected(reader, stream._storedError);610 reader._closedPromise.catch(() => {});611 }612}613// A client of ReadableStreamDefaultReader and ReadableStreamBYOBReader may use these functions directly to bypass state614// check.615function ReadableStreamReaderGenericCancel(reader, reason) {616 const stream = reader._ownerReadableStream;617 assert(stream !== undefined);618 return ReadableStreamCancel(stream, reason);619}620function ReadableStreamReaderGenericRelease(reader) {621 assert(reader._ownerReadableStream !== undefined);622 assert(reader._ownerReadableStream._reader === reader);623 if (reader._ownerReadableStream._state === 'readable') {624 defaultReaderClosedPromiseReject(625 reader,626 new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));627 } else {628 defaultReaderClosedPromiseResetToRejected(629 reader,630 new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));631 }632 reader._closedPromise.catch(() => {});633 reader._ownerReadableStream._reader = undefined;634 reader._ownerReadableStream = undefined;635}636function ReadableStreamBYOBReaderRead(reader, view) {637 const stream = reader._ownerReadableStream;638 assert(stream !== undefined);639 stream._disturbed = true;640 if (stream._state === 'errored') {641 return Promise.reject(stream._storedError);642 }643 // Controllers must implement this.644 return ReadableByteStreamControllerPullInto(stream._readableStreamController, view);645}646function ReadableStreamDefaultReaderRead(reader) {647 const stream = reader._ownerReadableStream;648 assert(stream !== undefined);649 stream._disturbed = true;650 if (stream._state === 'closed') {651 return Promise.resolve(CreateIterResultObject(undefined, true));652 }653 if (stream._state === 'errored') {654 return Promise.reject(stream._storedError);655 }656 assert(stream._state === 'readable');657 return stream._readableStreamController[InternalPull]();658}659// Controllers660class ReadableStreamDefaultController {661 constructor(stream, underlyingSource, size, highWaterMark) {662 if (IsReadableStream(stream) === false) {663 throw new TypeError('ReadableStreamDefaultController can only be constructed with a ReadableStream instance');664 }665 if (stream._readableStreamController !== undefined) {666 throw new TypeError(667 'ReadableStreamDefaultController instances can only be created by the ReadableStream constructor');668 }669 this._controlledReadableStream = stream;670 this._underlyingSource = underlyingSource;671 this._queue = [];672 this._started = false;673 this._closeRequested = false;674 this._pullAgain = false;675 this._pulling = false;676 const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);677 this._strategySize = normalizedStrategy.size;678 this._strategyHWM = normalizedStrategy.highWaterMark;679 const controller = this;680 const startResult = InvokeOrNoop(underlyingSource, 'start', [this]);681 Promise.resolve(startResult).then(682 () => {683 controller._started = true;684 assert(controller._pulling === false);685 assert(controller._pullAgain === false);686 ReadableStreamDefaultControllerCallPullIfNeeded(controller);687 },688 r => {689 ReadableStreamDefaultControllerErrorIfNeeded(controller, r);690 }691 )692 .catch(rethrowAssertionErrorRejection);693 }694 get desiredSize() {695 if (IsReadableStreamDefaultController(this) === false) {696 throw defaultControllerBrandCheckException('desiredSize');697 }698 return ReadableStreamDefaultControllerGetDesiredSize(this);699 }700 close() {701 if (IsReadableStreamDefaultController(this) === false) {702 throw defaultControllerBrandCheckException('close');703 }704 if (this._closeRequested === true) {705 throw new TypeError('The stream has already been closed; do not close it again!');706 }707 const state = this._controlledReadableStream._state;708 if (state !== 'readable') {709 throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be closed`);710 }711 ReadableStreamDefaultControllerClose(this);712 }713 enqueue(chunk) {714 if (IsReadableStreamDefaultController(this) === false) {715 throw defaultControllerBrandCheckException('enqueue');716 }717 if (this._closeRequested === true) {718 throw new TypeError('stream is closed or draining');719 }720 const state = this._controlledReadableStream._state;721 if (state !== 'readable') {722 throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be enqueued to`);723 }724 return ReadableStreamDefaultControllerEnqueue(this, chunk);725 }726 error(e) {727 if (IsReadableStreamDefaultController(this) === false) {728 throw defaultControllerBrandCheckException('error');729 }730 const stream = this._controlledReadableStream;731 if (stream._state !== 'readable') {732 throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);733 }734 ReadableStreamDefaultControllerError(this, e);735 }736 [InternalCancel](reason) {737 this._queue = [];738 return PromiseInvokeOrNoop(this._underlyingSource, 'cancel', [reason]);739 }740 [InternalPull]() {741 const stream = this._controlledReadableStream;742 if (this._queue.length > 0) {743 const chunk = DequeueValue(this._queue);744 if (this._closeRequested === true && this._queue.length === 0) {745 ReadableStreamClose(stream);746 } else {747 ReadableStreamDefaultControllerCallPullIfNeeded(this);748 }749 return Promise.resolve(CreateIterResultObject(chunk, false));750 }751 const pendingPromise = ReadableStreamAddReadRequest(stream);752 ReadableStreamDefaultControllerCallPullIfNeeded(this);753 return pendingPromise;754 }755}756// Abstract operations for the ReadableStreamDefaultController.757function IsReadableStreamDefaultController(x) {758 if (!typeIsObject(x)) {759 return false;760 }761 if (!Object.prototype.hasOwnProperty.call(x, '_underlyingSource')) {762 return false;763 }764 return true;765}766function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {767 const shouldPull = ReadableStreamDefaultControllerShouldCallPull(controller);768 if (shouldPull === false) {769 return undefined;770 }771 if (controller._pulling === true) {772 controller._pullAgain = true;773 return undefined;774 }775 assert(controller._pullAgain === false);776 controller._pulling = true;777 const pullPromise = PromiseInvokeOrNoop(controller._underlyingSource, 'pull', [controller]);778 pullPromise.then(779 () => {780 controller._pulling = false;781 if (controller._pullAgain === true) {782 controller._pullAgain = false;783 return ReadableStreamDefaultControllerCallPullIfNeeded(controller);784 }785 return undefined;786 },787 e => {788 ReadableStreamDefaultControllerErrorIfNeeded(controller, e);789 }790 )791 .catch(rethrowAssertionErrorRejection);792 return undefined;793}794function ReadableStreamDefaultControllerShouldCallPull(controller) {795 const stream = controller._controlledReadableStream;796 if (stream._state === 'closed' || stream._state === 'errored') {797 return false;798 }799 if (controller._closeRequested === true) {800 return false;801 }802 if (controller._started === false) {803 return false;804 }805 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {806 return true;807 }808 const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller);809 if (desiredSize > 0) {810 return true;811 }812 return false;813}814// A client of ReadableStreamDefaultController may use these functions directly to bypass state check.815function ReadableStreamDefaultControllerClose(controller) {816 const stream = controller._controlledReadableStream;817 assert(controller._closeRequested === false);818 assert(stream._state === 'readable');819 controller._closeRequested = true;820 if (controller._queue.length === 0) {821 ReadableStreamClose(stream);822 }823}824function ReadableStreamDefaultControllerEnqueue(controller, chunk) {825 const stream = controller._controlledReadableStream;826 assert(controller._closeRequested === false);827 assert(stream._state === 'readable');828 if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {829 ReadableStreamFulfillReadRequest(stream, chunk, false);830 } else {831 let chunkSize = 1;832 if (controller._strategySize !== undefined) {833 const strategySize = controller._strategySize;834 try {835 chunkSize = strategySize(chunk);836 } catch (chunkSizeE) {837 ReadableStreamDefaultControllerErrorIfNeeded(controller, chunkSizeE);838 throw chunkSizeE;839 }840 }841 try {842 EnqueueValueWithSize(controller._queue, chunk, chunkSize);843 } catch (enqueueE) {844 ReadableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);845 throw enqueueE;846 }847 }848 ReadableStreamDefaultControllerCallPullIfNeeded(controller);849 return undefined;850}851function ReadableStreamDefaultControllerError(controller, e) {852 const stream = controller._controlledReadableStream;853 assert(stream._state === 'readable');854 controller._queue = [];855 ReadableStreamError(stream, e);856}857function ReadableStreamDefaultControllerErrorIfNeeded(controller, e) {858 if (controller._controlledReadableStream._state === 'readable') {859 ReadableStreamDefaultControllerError(controller, e);860 }861}862function ReadableStreamDefaultControllerGetDesiredSize(controller) {863 const queueSize = GetTotalQueueSize(controller._queue);864 return controller._strategyHWM - queueSize;865}866class ReadableStreamBYOBRequest {867 constructor(controller, view) {868 this._associatedReadableByteStreamController = controller;869 this._view = view;870 }871 get view() {872 return this._view;873 }874 respond(bytesWritten) {875 if (IsReadableStreamBYOBRequest(this) === false) {876 throw byobRequestBrandCheckException('respond');877 }878 if (this._associatedReadableByteStreamController === undefined) {879 throw new TypeError('This BYOB request has been invalidated');880 }881 ReadableByteStreamControllerRespond(this._associatedReadableByteStreamController, bytesWritten);882 }883 respondWithNewView(view) {884 if (IsReadableStreamBYOBRequest(this) === false) {885 throw byobRequestBrandCheckException('respond');886 }887 if (this._associatedReadableByteStreamController === undefined) {888 throw new TypeError('This BYOB request has been invalidated');889 }890 if (!ArrayBuffer.isView(view)) {891 throw new TypeError('You can only respond with array buffer views');892 }893 ReadableByteStreamControllerRespondWithNewView(this._associatedReadableByteStreamController, view);894 }895}896class ReadableByteStreamController {897 constructor(stream, underlyingByteSource, highWaterMark) {898 if (IsReadableStream(stream) === false) {899 throw new TypeError('ReadableByteStreamController can only be constructed with a ReadableStream instance given ' +900 'a byte source');901 }902 if (stream._readableStreamController !== undefined) {903 throw new TypeError(904 'ReadableByteStreamController instances can only be created by the ReadableStream constructor given a byte ' +905 'source');906 }907 this._controlledReadableStream = stream;908 this._underlyingByteSource = underlyingByteSource;909 this._pullAgain = false;910 this._pulling = false;911 ReadableByteStreamControllerClearPendingPullIntos(this);912 this._queue = [];913 this._totalQueuedBytes = 0;914 this._closeRequested = false;915 this._started = false;916 this._strategyHWM = ValidateAndNormalizeHighWaterMark(highWaterMark);917 const autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;918 if (autoAllocateChunkSize !== undefined) {919 if (Number.isInteger(autoAllocateChunkSize) === false || autoAllocateChunkSize <= 0) {920 throw new RangeError('autoAllocateChunkSize must be a positive integer');921 }922 }923 this._autoAllocateChunkSize = autoAllocateChunkSize;924 this._pendingPullIntos = [];925 const controller = this;926 const startResult = InvokeOrNoop(underlyingByteSource, 'start', [this]);927 Promise.resolve(startResult).then(928 () => {929 controller._started = true;930 assert(controller._pulling === false);931 assert(controller._pullAgain === false);932 ReadableByteStreamControllerCallPullIfNeeded(controller);933 },934 r => {935 if (stream._state === 'readable') {936 ReadableByteStreamControllerError(controller, r);937 }938 }939 )940 .catch(rethrowAssertionErrorRejection);941 }942 get byobRequest() {943 if (IsReadableByteStreamController(this) === false) {944 throw byteStreamControllerBrandCheckException('byobRequest');945 }946 if (this._byobRequest === undefined && this._pendingPullIntos.length > 0) {947 const firstDescriptor = this._pendingPullIntos[0];948 const view = new Uint8Array(firstDescriptor.buffer,949 firstDescriptor.byteOffset + firstDescriptor.bytesFilled,950 firstDescriptor.byteLength - firstDescriptor.bytesFilled);951 this._byobRequest = new ReadableStreamBYOBRequest(this, view);952 }953 return this._byobRequest;954 }955 get desiredSize() {956 if (IsReadableByteStreamController(this) === false) {957 throw byteStreamControllerBrandCheckException('desiredSize');958 }959 return ReadableByteStreamControllerGetDesiredSize(this);960 }961 close() {962 if (IsReadableByteStreamController(this) === false) {963 throw byteStreamControllerBrandCheckException('close');964 }965 if (this._closeRequested === true) {966 throw new TypeError('The stream has already been closed; do not close it again!');967 }968 const state = this._controlledReadableStream._state;969 if (state !== 'readable') {970 throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be closed`);971 }972 ReadableByteStreamControllerClose(this);973 }974 enqueue(chunk) {975 if (IsReadableByteStreamController(this) === false) {976 throw byteStreamControllerBrandCheckException('enqueue');977 }978 if (this._closeRequested === true) {979 throw new TypeError('stream is closed or draining');980 }981 const state = this._controlledReadableStream._state;982 if (state !== 'readable') {983 throw new TypeError(`The stream (in ${state} state) is not in the readable state and cannot be enqueued to`);984 }985 if (!ArrayBuffer.isView(chunk)) {986 throw new TypeError('You can only enqueue array buffer views when using a ReadableByteStreamController');987 }988 ReadableByteStreamControllerEnqueue(this, chunk);989 }990 error(e) {991 if (IsReadableByteStreamController(this) === false) {992 throw byteStreamControllerBrandCheckException('error');993 }994 const stream = this._controlledReadableStream;995 if (stream._state !== 'readable') {996 throw new TypeError(`The stream is ${stream._state} and so cannot be errored`);997 }998 ReadableByteStreamControllerError(this, e);999 }1000 [InternalCancel](reason) {1001 if (this._pendingPullIntos.length > 0) {1002 const firstDescriptor = this._pendingPullIntos[0];1003 firstDescriptor.bytesFilled = 0;1004 }1005 this._queue = [];1006 this._totalQueuedBytes = 0;1007 return PromiseInvokeOrNoop(this._underlyingByteSource, 'cancel', [reason]);1008 }1009 [InternalPull]() {1010 const stream = this._controlledReadableStream;1011 assert(ReadableStreamHasDefaultReader(stream) === true);1012 if (this._totalQueuedBytes > 0) {1013 assert(ReadableStreamGetNumReadRequests(stream) === 0);1014 const entry = this._queue.shift();1015 this._totalQueuedBytes -= entry.byteLength;1016 ReadableByteStreamControllerHandleQueueDrain(this);1017 let view;1018 try {1019 view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);1020 } catch (viewE) {1021 return Promise.reject(viewE);1022 }1023 return Promise.resolve(CreateIterResultObject(view, false));1024 }1025 const autoAllocateChunkSize = this._autoAllocateChunkSize;1026 if (autoAllocateChunkSize !== undefined) {1027 let buffer;1028 try {1029 buffer = new ArrayBuffer(autoAllocateChunkSize);1030 } catch (bufferE) {1031 return Promise.reject(bufferE);1032 }1033 const pullIntoDescriptor = {1034 buffer,1035 byteOffset: 0,1036 byteLength: autoAllocateChunkSize,1037 bytesFilled: 0,1038 elementSize: 1,1039 ctor: Uint8Array,1040 readerType: 'default'1041 };1042 this._pendingPullIntos.push(pullIntoDescriptor);1043 }1044 const promise = ReadableStreamAddReadRequest(stream);1045 ReadableByteStreamControllerCallPullIfNeeded(this);1046 return promise;1047 }1048}1049// Abstract operations for the ReadableByteStreamController.1050function IsReadableByteStreamController(x) {1051 if (!typeIsObject(x)) {1052 return false;1053 }1054 if (!Object.prototype.hasOwnProperty.call(x, '_underlyingByteSource')) {1055 return false;1056 }1057 return true;1058}1059function IsReadableStreamBYOBRequest(x) {1060 if (!typeIsObject(x)) {1061 return false;1062 }1063 if (!Object.prototype.hasOwnProperty.call(x, '_associatedReadableByteStreamController')) {1064 return false;1065 }1066 return true;1067}1068function ReadableByteStreamControllerCallPullIfNeeded(controller) {1069 const shouldPull = ReadableByteStreamControllerShouldCallPull(controller);1070 if (shouldPull === false) {1071 return undefined;1072 }1073 if (controller._pulling === true) {1074 controller._pullAgain = true;1075 return undefined;1076 }1077 assert(controller._pullAgain === false);1078 controller._pulling = true;1079 // TODO: Test controller argument1080 const pullPromise = PromiseInvokeOrNoop(controller._underlyingByteSource, 'pull', [controller]);1081 pullPromise.then(1082 () => {1083 controller._pulling = false;1084 if (controller._pullAgain === true) {1085 controller._pullAgain = false;1086 ReadableByteStreamControllerCallPullIfNeeded(controller);1087 }1088 },1089 e => {1090 if (controller._controlledReadableStream._state === 'readable') {1091 ReadableByteStreamControllerError(controller, e);1092 }1093 }1094 )1095 .catch(rethrowAssertionErrorRejection);1096 return undefined;1097}1098function ReadableByteStreamControllerClearPendingPullIntos(controller) {1099 ReadableByteStreamControllerInvalidateBYOBRequest(controller);1100 controller._pendingPullIntos = [];1101}1102function ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor) {1103 assert(stream._state !== 'errored', 'state must not be errored');1104 let done = false;1105 if (stream._state === 'closed') {1106 assert(pullIntoDescriptor.bytesFilled === 0);1107 done = true;1108 }1109 const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor);1110 if (pullIntoDescriptor.readerType === 'default') {1111 ReadableStreamFulfillReadRequest(stream, filledView, done);1112 } else {1113 assert(pullIntoDescriptor.readerType === 'byob');1114 ReadableStreamFulfillReadIntoRequest(stream, filledView, done);1115 }1116}1117function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor) {1118 const bytesFilled = pullIntoDescriptor.bytesFilled;1119 const elementSize = pullIntoDescriptor.elementSize;1120 assert(bytesFilled <= pullIntoDescriptor.byteLength);1121 assert(bytesFilled % elementSize === 0);1122 return new pullIntoDescriptor.ctor(1123 pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);1124}1125function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) {1126 controller._queue.push({ buffer, byteOffset, byteLength });1127 controller._totalQueuedBytes += byteLength;1128}1129function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) {1130 const elementSize = pullIntoDescriptor.elementSize;1131 const currentAlignedBytes = pullIntoDescriptor.bytesFilled - pullIntoDescriptor.bytesFilled % elementSize;1132 const maxBytesToCopy = Math.min(controller._totalQueuedBytes,1133 pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled);1134 const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;1135 const maxAlignedBytes = maxBytesFilled - maxBytesFilled % elementSize;1136 let totalBytesToCopyRemaining = maxBytesToCopy;1137 let ready = false;1138 if (maxAlignedBytes > currentAlignedBytes) {1139 totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;1140 ready = true;1141 }1142 const queue = controller._queue;1143 while (totalBytesToCopyRemaining > 0) {1144 const headOfQueue = queue[0];1145 const bytesToCopy = Math.min(totalBytesToCopyRemaining, headOfQueue.byteLength);1146 const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;1147 ArrayBufferCopy(pullIntoDescriptor.buffer, destStart, headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy);1148 if (headOfQueue.byteLength === bytesToCopy) {1149 queue.shift();1150 } else {1151 headOfQueue.byteOffset += bytesToCopy;1152 headOfQueue.byteLength -= bytesToCopy;1153 }1154 controller._totalQueuedBytes -= bytesToCopy;1155 ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor);1156 totalBytesToCopyRemaining -= bytesToCopy;1157 }1158 if (ready === false) {1159 assert(controller._totalQueuedBytes === 0, 'queue must be empty');1160 assert(pullIntoDescriptor.bytesFilled > 0);1161 assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);1162 }1163 return ready;1164}1165function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, size, pullIntoDescriptor) {1166 assert(controller._pendingPullIntos.length === 0 || controller._pendingPullIntos[0] === pullIntoDescriptor);1167 ReadableByteStreamControllerInvalidateBYOBRequest(controller);1168 pullIntoDescriptor.bytesFilled += size;1169}1170function ReadableByteStreamControllerHandleQueueDrain(controller) {1171 assert(controller._controlledReadableStream._state === 'readable');1172 if (controller._totalQueuedBytes === 0 && controller._closeRequested === true) {1173 ReadableStreamClose(controller._controlledReadableStream);1174 } else {1175 ReadableByteStreamControllerCallPullIfNeeded(controller);1176 }1177}1178function ReadableByteStreamControllerInvalidateBYOBRequest(controller) {1179 if (controller._byobRequest === undefined) {1180 return;1181 }1182 controller._byobRequest._associatedReadableByteStreamController = undefined;1183 controller._byobRequest._view = undefined;1184 controller._byobRequest = undefined;1185}1186function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) {1187 assert(controller._closeRequested === false);1188 while (controller._pendingPullIntos.length > 0) {1189 if (controller._totalQueuedBytes === 0) {1190 return;1191 }1192 const pullIntoDescriptor = controller._pendingPullIntos[0];1193 if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) {1194 ReadableByteStreamControllerShiftPendingPullInto(controller);1195 ReadableByteStreamControllerCommitPullIntoDescriptor(controller._controlledReadableStream, pullIntoDescriptor);1196 }1197 }1198}1199function ReadableByteStreamControllerPullInto(controller, view) {1200 const stream = controller._controlledReadableStream;1201 let elementSize = 1;1202 if (view.constructor !== DataView) {1203 elementSize = view.constructor.BYTES_PER_ELEMENT;1204 }1205 const ctor = view.constructor;1206 const pullIntoDescriptor = {1207 buffer: view.buffer,1208 byteOffset: view.byteOffset,1209 byteLength: view.byteLength,1210 bytesFilled: 0,1211 elementSize,1212 ctor,1213 readerType: 'byob'1214 };1215 if (controller._pendingPullIntos.length > 0) {1216 pullIntoDescriptor.buffer = SameRealmTransfer(pullIntoDescriptor.buffer);1217 controller._pendingPullIntos.push(pullIntoDescriptor);1218 // No ReadableByteStreamControllerCallPullIfNeeded() call since:1219 // - No change happens on desiredSize1220 // - The source has already been notified of that there's at least 1 pending read(view)1221 return ReadableStreamAddReadIntoRequest(stream);1222 }1223 if (stream._state === 'closed') {1224 const emptyView = new view.constructor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0);1225 return Promise.resolve(CreateIterResultObject(emptyView, true));1226 }1227 if (controller._totalQueuedBytes > 0) {1228 if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) {1229 const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor);1230 ReadableByteStreamControllerHandleQueueDrain(controller);1231 return Promise.resolve(CreateIterResultObject(filledView, false));1232 }1233 if (controller._closeRequested === true) {1234 const e = new TypeError('Insufficient bytes to fill elements in the given buffer');1235 ReadableByteStreamControllerError(controller, e);1236 return Promise.reject(e);1237 }1238 }1239 pullIntoDescriptor.buffer = SameRealmTransfer(pullIntoDescriptor.buffer);1240 controller._pendingPullIntos.push(pullIntoDescriptor);1241 const promise = ReadableStreamAddReadIntoRequest(stream);1242 ReadableByteStreamControllerCallPullIfNeeded(controller);1243 return promise;1244}1245function ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor) {1246 firstDescriptor.buffer = SameRealmTransfer(firstDescriptor.buffer);1247 assert(firstDescriptor.bytesFilled === 0, 'bytesFilled must be 0');1248 const stream = controller._controlledReadableStream;1249 while (ReadableStreamGetNumReadIntoRequests(stream) > 0) {1250 const pullIntoDescriptor = ReadableByteStreamControllerShiftPendingPullInto(controller);1251 ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor);1252 }1253}1254function ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor) {1255 if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) {1256 throw new RangeError('bytesWritten out of range');1257 }1258 ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor);1259 if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {1260 // TODO: Figure out whether we should detach the buffer or not here.1261 return;1262 }1263 ReadableByteStreamControllerShiftPendingPullInto(controller);1264 const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;1265 if (remainderSize > 0) {1266 const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;1267 const remainder = pullIntoDescriptor.buffer.slice(end - remainderSize, end);1268 ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength);1269 }1270 pullIntoDescriptor.buffer = SameRealmTransfer(pullIntoDescriptor.buffer);1271 pullIntoDescriptor.bytesFilled -= remainderSize;1272 ReadableByteStreamControllerCommitPullIntoDescriptor(controller._controlledReadableStream, pullIntoDescriptor);1273 ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);1274}1275function ReadableByteStreamControllerRespondInternal(controller, bytesWritten) {1276 const firstDescriptor = controller._pendingPullIntos[0];1277 const stream = controller._controlledReadableStream;1278 if (stream._state === 'closed') {1279 if (bytesWritten !== 0) {1280 throw new TypeError('bytesWritten must be 0 when calling respond() on a closed stream');1281 }1282 ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor);1283 } else {1284 assert(stream._state === 'readable');1285 ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);1286 }1287}1288function ReadableByteStreamControllerShiftPendingPullInto(controller) {1289 const descriptor = controller._pendingPullIntos.shift();1290 ReadableByteStreamControllerInvalidateBYOBRequest(controller);1291 return descriptor;1292}1293function ReadableByteStreamControllerShouldCallPull(controller) {1294 const stream = controller._controlledReadableStream;1295 if (stream._state !== 'readable') {1296 return false;1297 }1298 if (controller._closeRequested === true) {1299 return false;1300 }1301 if (controller._started === false) {1302 return false;1303 }1304 if (ReadableStreamHasDefaultReader(stream) && ReadableStreamGetNumReadRequests(stream) > 0) {1305 return true;1306 }1307 if (ReadableStreamHasBYOBReader(stream) && ReadableStreamGetNumReadIntoRequests(stream) > 0) {1308 return true;1309 }1310 if (ReadableByteStreamControllerGetDesiredSize(controller) > 0) {1311 return true;1312 }1313 return false;1314}1315// A client of ReadableByteStreamController may use these functions directly to bypass state check.1316function ReadableByteStreamControllerClose(controller) {1317 const stream = controller._controlledReadableStream;1318 assert(controller._closeRequested === false);1319 assert(stream._state === 'readable');1320 if (controller._totalQueuedBytes > 0) {1321 controller._closeRequested = true;1322 return;1323 }1324 if (controller._pendingPullIntos.length > 0) {1325 const firstPendingPullInto = controller._pendingPullIntos[0];1326 if (firstPendingPullInto.bytesFilled > 0) {1327 const e = new TypeError('Insufficient bytes to fill elements in the given buffer');1328 ReadableByteStreamControllerError(controller, e);1329 throw e;1330 }1331 }1332 ReadableStreamClose(stream);1333}1334function ReadableByteStreamControllerEnqueue(controller, chunk) {1335 const stream = controller._controlledReadableStream;1336 assert(controller._closeRequested === false);1337 assert(stream._state === 'readable');1338 const buffer = chunk.buffer;1339 const byteOffset = chunk.byteOffset;1340 const byteLength = chunk.byteLength;1341 const transferredBuffer = SameRealmTransfer(buffer);1342 if (ReadableStreamHasDefaultReader(stream) === true) {1343 if (ReadableStreamGetNumReadRequests(stream) === 0) {1344 ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);1345 } else {1346 assert(controller._queue.length === 0);1347 const transferredView = new Uint8Array(transferredBuffer, byteOffset, byteLength);1348 ReadableStreamFulfillReadRequest(stream, transferredView, false);1349 }1350 } else if (ReadableStreamHasBYOBReader(stream) === true) {1351 // TODO: Ideally in this branch detaching should happen only if the buffer is not consumed fully.1352 ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);1353 ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);1354 } else {1355 assert(IsReadableStreamLocked(stream) === false, 'stream must not be locked');1356 ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);1357 }1358}1359function ReadableByteStreamControllerError(controller, e) {1360 const stream = controller._controlledReadableStream;1361 assert(stream._state === 'readable');1362 ReadableByteStreamControllerClearPendingPullIntos(controller);1363 controller._queue = [];1364 ReadableStreamError(stream, e);1365}1366function ReadableByteStreamControllerGetDesiredSize(controller) {1367 return controller._strategyHWM - controller._totalQueuedBytes;1368}1369function ReadableByteStreamControllerRespond(controller, bytesWritten) {1370 bytesWritten = Number(bytesWritten);1371 if (IsFiniteNonNegativeNumber(bytesWritten) === false) {1372 throw new RangeError('bytesWritten must be a finite');1373 }1374 assert(controller._pendingPullIntos.length > 0);1375 ReadableByteStreamControllerRespondInternal(controller, bytesWritten);1376}1377function ReadableByteStreamControllerRespondWithNewView(controller, view) {1378 assert(controller._pendingPullIntos.length > 0);1379 const firstDescriptor = controller._pendingPullIntos[0];1380 if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) {1381 throw new RangeError('The region specified by view does not match byobRequest');1382 }1383 if (firstDescriptor.byteLength !== view.byteLength) {1384 throw new RangeError('The buffer of view has different capacity than byobRequest');1385 }1386 firstDescriptor.buffer = view.buffer;1387 ReadableByteStreamControllerRespondInternal(controller, view.byteLength);1388}1389// Helper functions for the ReadableStream.1390function streamBrandCheckException(name) {1391 return new TypeError(`ReadableStream.prototype.${name} can only be used on a ReadableStream`);1392}1393// Helper functions for the readers.1394function readerLockException(name) {1395 return new TypeError('Cannot ' + name + ' a stream using a released reader');1396}1397// Helper functions for the ReadableStreamDefaultReader.1398function defaultReaderBrandCheckException(name) {1399 return new TypeError(1400 `ReadableStreamDefaultReader.prototype.${name} can only be used on a ReadableStreamDefaultReader`);1401}1402function defaultReaderClosedPromiseInitialize(reader) {1403 reader._closedPromise = new Promise((resolve, reject) => {1404 reader._closedPromise_resolve = resolve;1405 reader._closedPromise_reject = reject;1406 });1407}1408function defaultReaderClosedPromiseInitializeAsRejected(reader, reason) {1409 reader._closedPromise = Promise.reject(reason);1410 reader._closedPromise_resolve = undefined;1411 reader._closedPromise_reject = undefined;1412}1413function defaultReaderClosedPromiseInitializeAsResolved(reader) {1414 reader._closedPromise = Promise.resolve(undefined);1415 reader._closedPromise_resolve = undefined;1416 reader._closedPromise_reject = undefined;...

Full Screen

Full Screen

generic-reader.ts

Source:generic-reader.ts Github

copy

Full Screen

...4export function ReadableStreamReaderGenericInitialize<R>(reader: ReadableStreamReader<R>, stream: ReadableStream<R>) {5 reader._ownerReadableStream = stream;6 stream._reader = reader;7 if (stream._state === 'readable') {8 defaultReaderClosedPromiseInitialize(reader);9 } else if (stream._state === 'closed') {10 defaultReaderClosedPromiseInitializeAsResolved(reader);11 } else {12 assert(stream._state === 'errored');13 defaultReaderClosedPromiseInitializeAsRejected(reader, stream._storedError);14 }15}16// A client of ReadableStreamDefaultReader and ReadableStreamBYOBReader may use these functions directly to bypass state17// check.18export function ReadableStreamReaderGenericCancel(reader: ReadableStreamReader<any>, reason: any): Promise<void> {19 const stream = reader._ownerReadableStream;20 assert(stream !== undefined);21 return ReadableStreamCancel(stream, reason);22}23export function ReadableStreamReaderGenericRelease(reader: ReadableStreamReader<any>) {24 assert(reader._ownerReadableStream !== undefined);25 assert(reader._ownerReadableStream._reader === reader);26 if (reader._ownerReadableStream._state === 'readable') {27 defaultReaderClosedPromiseReject(28 reader,29 new TypeError(`Reader was released and can no longer be used to monitor the stream's closedness`));30 } else {31 defaultReaderClosedPromiseResetToRejected(32 reader,33 new TypeError(`Reader was released and can no longer be used to monitor the stream's closedness`));34 }35 reader._ownerReadableStream._reader = undefined;36 reader._ownerReadableStream = undefined!;37}38// Helper functions for the readers.39export function readerLockException(name: string): TypeError {40 return new TypeError('Cannot ' + name + ' a stream using a released reader');41}42// Helper functions for the ReadableStreamDefaultReader.43export function defaultReaderClosedPromiseInitialize(reader: ReadableStreamReader<any>) {44 reader._closedPromise = newPromise((resolve, reject) => {45 reader._closedPromise_resolve = resolve;46 reader._closedPromise_reject = reject;47 });48}49export function defaultReaderClosedPromiseInitializeAsRejected(reader: ReadableStreamReader<any>, reason: any) {50 defaultReaderClosedPromiseInitialize(reader);51 defaultReaderClosedPromiseReject(reader, reason);52}53export function defaultReaderClosedPromiseInitializeAsResolved(reader: ReadableStreamReader<any>) {54 defaultReaderClosedPromiseInitialize(reader);55 defaultReaderClosedPromiseResolve(reader);56}57export function defaultReaderClosedPromiseReject(reader: ReadableStreamReader<any>, reason: any) {58 assert(reader._closedPromise_resolve !== undefined);59 assert(reader._closedPromise_reject !== undefined);60 setPromiseIsHandledToTrue(reader._closedPromise);61 reader._closedPromise_reject!(reason);62 reader._closedPromise_resolve = undefined;63 reader._closedPromise_reject = undefined;64}65export function defaultReaderClosedPromiseResetToRejected(reader: ReadableStreamReader<any>, reason: any) {66 assert(reader._closedPromise_resolve === undefined);67 assert(reader._closedPromise_reject === undefined);68 defaultReaderClosedPromiseInitializeAsRejected(reader, reason);...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1promise_test(async t => {2 const reader = new ReadableStream().getReader();3 const promise = reader.closed;4 reader.releaseLock();5 await promise;6}, 'ReadableStream with no chunks');7promise_test(async t => {8 const reader = new ReadableStream({9 start(controller) {10 controller.enqueue('a');11 },12 }).getReader();13 const promise = reader.closed;14 reader.releaseLock();15 await promise;16}, 'ReadableStream with one chunk');17promise_test(async t => {18 const rs = new ReadableStream({19 start(controller) {20 controller.enqueue('a');21 controller.enqueue('b');22 },23 });24 const reader = rs.getReader();25 const promise = reader.closed;26 reader.releaseLock();27 await promise;28}, 'ReadableStream with two chunks');29promise_test(async t => {30 const rs = new ReadableStream({31 start(controller) {32 controller.enqueue('a');33 controller.enqueue('b');34 },35 });36 const reader = rs.getReader();37 const promise = reader.closed;38 reader.releaseLock();39 assert_equals(await rs.getReader().closed, undefined, 'the stream should be still readable');40 await promise;41}, 'ReadableStream with two chunks and a reader that is not closed');42promise_test(async t => {43 const rs = new ReadableStream({44 start(controller) {45 controller.enqueue('a');46 controller.enqueue('b');47 },48 });49 const reader = rs.getReader();50 const promise = reader.closed;51 reader.releaseLock();52 assert_equals(await rs.getReader().closed, undefined, 'the stream should be still readable');53 await promise;54}, 'ReadableStream with two chunks and a reader that is not closed');55promise_test(async t => {56 const rs = new ReadableStream({57 start(controller) {58 controller.enqueue('a');59 controller.enqueue('b');60 },61 });62 const reader = rs.getReader();63 const promise = reader.closed;64 reader.releaseLock();65 rs.cancel();66 await promise;67}, 'ReadableStream with two chunks and a reader that is not closed');68promise_test(async t => {69 const rs = new ReadableStream({70 start(controller) {71 controller.enqueue('a');72 controller.enqueue('b');73 },74 });75 const reader = rs.getReader();76 const promise = reader.closed;77 reader.releaseLock();78 rs.cancel();

Full Screen

Using AI Code Generation

copy

Full Screen

1import { defaultReaderClosedPromiseInitialize } from './wpt.js';2const reader = new ReadableStream().getReader();3const closed = defaultReaderClosedPromiseInitialize(reader);4closed.then(() => console.log('closed!'));5reader.releaseLock();6export function defaultReaderClosedPromiseInitialize(reader) {7 return new Promise((resolve, reject) => {8 reader.closed.then(9 () => resolve(),10 r => reject(r)11 );12 });13}14{15}16export function defaultReaderClosedPromiseInitialize(reader) {17 return reader.closed;18}19{20}

Full Screen

Using AI Code Generation

copy

Full Screen

1const reader = new ReadableStream().getReader();2const promise = defaultReaderClosedPromiseInitialize(reader);3promise.then(() => console.log('closed'));4reader.releaseLock();5const reader = new ReadableStream().getReader();6const promise = defaultReaderClosedPromiseInitialize(reader);7promise.then(() => console.log('closed'));8reader.releaseLock();

Full Screen

Using AI Code Generation

copy

Full Screen

1test(() => {2 const reader = new ReadableStream().getReader();3 reader.releaseLock();4 assert_throws(new TypeError(), () => reader.closed);5}, 'closed should throw a TypeError');6test(() => {7 const reader = new ReadableStream().getReader();8 reader.releaseLock();9 assert_throws(new TypeError(), () => reader.closed);10}, 'closed should throw a TypeError');11test(() => {12 const reader = new ReadableStream().getReader();13 reader.releaseLock();14 assert_throws(new TypeError(), () => reader.closed);15}, 'closed should throw a TypeError');16test(() => {17 const reader = new ReadableStream().getReader();18 reader.releaseLock();19 assert_throws(new TypeError(), () => reader.closed);20}, 'closed should throw a TypeError');21test(() => {22 const reader = new ReadableStream().getReader();23 reader.releaseLock();24 assert_throws(new TypeError(), () => reader.closed);25}, 'closed should throw a TypeError');26test(() => {27 const reader = new ReadableStream().getReader();28 reader.releaseLock();29 assert_throws(new TypeError(), () => reader.closed);30}, 'closed should throw a TypeError');31test(() => {32 const reader = new ReadableStream().getReader();33 reader.releaseLock();34 assert_throws(new TypeError(), () => reader.closed);35}, 'closed should throw a TypeError');36test(() => {37 const reader = new ReadableStream().getReader();38 reader.releaseLock();39 assert_throws(new TypeError(), () => reader.closed);40}, 'closed should throw a TypeError');

Full Screen

Using AI Code Generation

copy

Full Screen

1async function test() {2 const defaultReaderClosedPromise = wpt.defaultReaderClosedPromiseInitialize();3 wpt.defaultReaderClosedPromiseResolve(defaultReaderClosedPromise);4}5test();6const wpt = {7 defaultReaderClosedPromiseInitialize() {8 return new Promise((resolve, reject) => {9 this.defaultReaderClosedPromiseResolve = resolve;10 this.defaultReaderClosedPromiseReject = reject;11 });12 },13};14async function test() {15 const defaultReaderClosedPromise = wpt.defaultReaderClosedPromiseInitialize();16 wpt.defaultReaderClosedPromiseResolve(defaultReaderClosedPromise);17}18test();19const wpt = {20 defaultReaderClosedPromiseInitialize() {21 return new Promise((resolve, reject) => {22 this.defaultReaderClosedPromiseResolve = resolve;23 this.defaultReaderClosedPromiseReject = reject;24 });25 },26};27async function test() {28 const defaultReaderClosedPromise = wpt.defaultReaderClosedPromiseInitialize();29 wpt.defaultReaderClosedPromiseResolve(defaultReaderClosedPromise);30}31test();32const wpt = {33 defaultReaderClosedPromiseInitialize() {34 return new Promise((resolve, reject) => {35 this.defaultReaderClosedPromiseResolve = resolve;36 this.defaultReaderClosedPromiseReject = reject;37 });38 },39};40async function test() {41 const defaultReaderClosedPromise = wpt.defaultReaderClosedPromiseInitialize();42 wpt.defaultReaderClosedPromiseResolve(defaultReaderClosedPromise);43}44test();45const wpt = {46 defaultReaderClosedPromiseInitialize() {47 return new Promise((resolve, reject) => {48 this.defaultReaderClosedPromiseResolve = resolve;49 this.defaultReaderClosedPromiseReject = reject;50 });51 },52};53async function test() {

Full Screen

Using AI Code Generation

copy

Full Screen

1const reader = stream.getReader();2const {value, done} = await reader.read();3assert_equals(value, 1);4assert_equals(done, false);5const {value, done} = await reader.read();6assert_equals(value, 2);7assert_equals(done, false);8const {value, done} = await reader.read();9assert_equals(value, 3);10assert_equals(done, false);11const {value, done} = await reader.read();12assert_equals(value, undefined);13assert_equals(done, true);14const reader = stream.getReader();15const {value, done} = await reader.read();16assert_equals(value, 1);17assert_equals(done, false);18const {value, done} = await reader.read();19assert_equals(value, 2);20assert_equals(done, false);21const {value, done} = await reader.read();22assert_equals(value, 3);23assert_equals(done, false);24const {value, done} = await reader.read();25assert_equals(value, undefined);26assert_equals(done, true);27const reader = stream.getReader();28const {value, done} = await reader.read();29assert_equals(value, 1);30assert_equals(done, false);31const {value, done} = await reader.read();32assert_equals(value, 2);33assert_equals(done, false);34const {value, done} = await reader.read();35assert_equals(value, 3);36assert_equals(done, false);37const {value, done} = await reader.read();38assert_equals(value, undefined);39assert_equals(done, true);40const reader = stream.getReader();41const {value, done} = await reader.read();42assert_equals(value, 1);43assert_equals(done, false);44const {value, done} = await reader.read();45assert_equals(value, 2);46assert_equals(done, false);47const {value, done} =

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run wpt automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful