How to use mergeStreams method in Playwright Internal

Best JavaScript code snippet using playwright-internal

Run Playwright Internal automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

streamUtils-test.js

Source: streamUtils-test.js Github

copy
1const assert = require("assert");
2const { Readable } = require("stream");
3const {
4  transformObject,
5  ignoreFirstLine,
6  ignoreEmpty,
7  pipeline,
8  mergeStreams,
9  writeObject,
10} = require("../../../../common/streamUtils");
11
12const createStream = () => {
13  return new Readable({
14    objectMode: true,
15    read() {},
16  });
17};
18
19describe(__filename, () => {
20  it("should transformObject and writeObject", done => {
21    let chunks = [];
22    let stream = createStream();
23    stream.push("andré");
24    stream.push("bruno");
25    stream.push(null);
26
27    stream
28      .pipe(transformObject(data => data.substring(0, 1)))
29      .pipe(writeObject(data => chunks.push(data)))
30      .on("finish", () => {
31        assert.deepStrictEqual(chunks, ["a", "b"]);
32        done();
33      });
34  });
35
36  it("should transformObject and writeObject (async)", done => {
37    let chunks = [];
38    let stream = createStream();
39    stream.push("andré");
40    stream.push("bruno");
41    stream.push(null);
42
43    stream
44      .pipe(
45        transformObject(async data => {
46          return new Promise(resolve => {
47            resolve(data.substring(0, 1));
48          });
49        })
50      )
51      .pipe(
52        writeObject(async data => {
53          return new Promise(resolve => {
54            chunks.push(data);
55            resolve();
56          });
57        })
58      )
59      .on("finish", () => {
60        assert.deepStrictEqual(chunks, ["a", "b"]);
61        done();
62      });
63  });
64
65  it("should transformObject and writeObject (async + parallel)", done => {
66    let chunks = [];
67    let stream = createStream();
68    stream.push("andré");
69    stream.push("bruno");
70    stream.push("robert");
71    stream.push(null);
72
73    stream
74      .pipe(
75        transformObject(
76          async data => {
77            return new Promise(resolve => {
78              resolve(data.substring(0, 1));
79            });
80          },
81          { parallel: 2 }
82        )
83      )
84      .pipe(
85        writeObject(
86          data => {
87            return new Promise(resolve => {
88              chunks.push(data);
89              return setTimeout(() => resolve(), 10);
90            });
91          },
92          { parallel: 2 }
93        )
94      )
95      .on("finish", () => {
96        assert.deepStrictEqual(chunks, ["a", "b", "r"]);
97        done();
98      });
99  });
100
101  it("should transformObject with error", done => {
102    let stream = createStream();
103    stream.push("andré");
104    stream.push(null);
105
106    stream
107      .pipe(
108        transformObject(() => {
109          throw new Error("An error occurred");
110        })
111      )
112      .on("data", () => ({}))
113      .on("error", e => {
114        assert.strictEqual(e.message, "An error occurred");
115        done();
116      })
117      .on("finish", () => {
118        assert.fail();
119        done();
120      });
121  });
122
123  it("should writeObject with error", done => {
124    let stream = createStream();
125    stream.push("andré");
126    stream.push(null);
127
128    stream
129      .pipe(
130        writeObject(() => {
131          throw new Error("An error occurred");
132        })
133      )
134      .on("error", e => {
135        assert.strictEqual(e.message, "An error occurred");
136        done();
137      })
138      .on("finish", () => {
139        assert.fail();
140        done();
141      });
142  });
143
144  it("can mergeStreams streams", done => {
145    let chunks = [];
146    let source = createStream();
147    source.push("andré");
148    source.push("bruno");
149    source.push("robert");
150    source.push(null);
151
152    mergeStreams(source)
153      .pipe(transformObject(data => data.substring(0, 1)))
154      .pipe(writeObject(data => chunks.push(data)))
155      .on("finish", () => {
156        assert.deepStrictEqual(chunks, ["a", "b", "r"]);
157        done();
158      });
159  });
160
161  it("can mergeStreams streams (error propagation)", done => {
162    let source = createStream();
163    source.push("andré");
164
165    mergeStreams(
166      source,
167      writeObject(() => ({}))
168    )
169      .on("error", e => {
170        assert.strictEqual(e, "Error from source");
171        done();
172      })
173      .on("finish", () => {
174        assert.fail();
175        done();
176      });
177
178    source.emit("error", "Error from source");
179  });
180
181  it("can mergeStreams streams (error propagation with promise)", done => {
182    let source = createStream();
183
184    mergeStreams(
185      source,
186      writeObject(() => ({}))
187    )
188      .then(() => {
189        assert.fail();
190        done();
191      })
192      .catch(e => {
193        assert.strictEqual(e, "emitted");
194        done();
195      });
196
197    source.emit("error", "emitted");
198  });
199
200  it("can mergeStreams streams (error propagation from a nested stream)", done => {
201    let source = createStream();
202    source.push("first");
203
204    mergeStreams(
205      source,
206      writeObject(() => {
207        throw new Error("write");
208      })
209    )
210      .on("error", e => {
211        assert.strictEqual(e.message, "write");
212        done();
213      })
214      .on("finish", () => {
215        assert.fail();
216        done();
217      });
218  });
219
220  it("can pipeline streams", async () => {
221    let chunks = [];
222    let source = createStream();
223    source.push("andré");
224    source.push("bruno");
225    source.push("robert");
226    source.push(null);
227
228    await pipeline(
229      source,
230      transformObject(data => data.substring(0, 1)),
231      writeObject(data => chunks.push(data))
232    );
233
234    assert.deepStrictEqual(chunks, ["a", "b", "r"]);
235  });
236
237  it("can pipeline streams (error propagation)", done => {
238    let source = createStream();
239
240    pipeline(
241      source,
242      writeObject(() => ({}))
243    )
244      .then(() => {
245        assert.fail();
246        done();
247      })
248      .catch(e => {
249        assert.strictEqual(e, "emitted");
250        done();
251      });
252
253    source.push("first");
254    source.emit("error", "emitted");
255  });
256
257  it("can pipeline streams (error callback propagation)", async () => {
258    let source = createStream();
259    let promise = pipeline(
260      source,
261      writeObject(() => {
262        throw new Error("An error occurred");
263      })
264    );
265
266    try {
267      source.push("andré");
268
269      await promise;
270      assert.fail();
271    } catch (e) {
272      assert.strictEqual(e.message, "An error occurred");
273    }
274  });
275
276  it("can use pipeline with mergeStreamsd streams", async () => {
277    let source = createStream();
278    let multi = mergeStreams(
279      source,
280      transformObject(d => d)
281    );
282
283    source.push("first");
284    source.push(null);
285
286    await pipeline(
287      multi,
288      writeObject(() => ({}))
289    );
290  });
291
292  it("should ignoreEmpty", done => {
293    let chunks = [];
294    let stream = createStream();
295    stream.push("first");
296    stream.push("");
297    stream.push(null);
298
299    stream
300      .pipe(ignoreEmpty())
301      .on("data", d => chunks.push(d))
302      .on("end", () => {
303        assert.deepStrictEqual(chunks, ["first"]);
304        done();
305      });
306  });
307
308  it("should ignoreFirstLine", done => {
309    let chunks = [];
310    let stream = createStream();
311    stream.push("first");
312    stream.push("second");
313    stream.push(null);
314
315    stream
316      .pipe(ignoreFirstLine())
317      .on("data", d => chunks.push(d))
318      .on("end", () => {
319        assert.deepStrictEqual(chunks, ["second"]);
320        done();
321      });
322  });
323});
324
Full Screen

migrations.js

Source: migrations.js Github

copy
1import {Migrations} from 'meteor/percolate:migrations';
2
3/*
4// 01.06.14 merge streams
5const migrateAt10614 = () => {
6  const mergeStreams = (streamTo, streamFrom) => {
7    GivenTasks.update({ streamId: streamFrom }, { $set: { streamId: streamTo } }, { multi: true });
8
9    const users = Meteor.users.find({ "profile.streams":streamFrom });
10
11    users.forEach((user) => {
12      let streamsIds = user.profile.streams.map((s) => {
13        return (s._id == streamFrom) ? {_id: streamTo} : s;
14      });
15
16      Meteor.users.update(user, { $set: { "profile.streams":streamsIds } });
17    });
18
19    Streams.remove(streamFrom);
20  }
21
22  mergeStreams("SbCvHpNynrcSwd9HY", "CXWYcdwNhBEGyuLzD"); // 2-ой с 6-ым
23  mergeStreams("fwPYXXKAB2Ftpz2sM", "FmGsF8534WwGHNoXB"); // 3-ий с 7-ым
24  mergeStreams("fwPYXXKAB2Ftpz2sM", "9L5MS5wAwQtGrbfAW"); // 3-ий с 8-ым
25  mergeStreams("5M6rnhF5EGed3qiLD", "LJASwBsDcE8n7SwnC"); // 4-ый с 1-ым
26
27  Meteor.users.update({ "profile.streams": "ZvYrFciXzsD8NQJHB" }, { $set: { "profile.mentor": "znksXwGwk5NdLGA7o" } }, { multi: 1 }); // 1-ый - Елена Морозова
28  Meteor.users.update({ "profile.streams": "SbCvHpNynrcSwd9HY" }, { $set: { "profile.mentor": "tPSxNPMrj99tBwcMu" } }, { multi: 1 }); // 2-ый - Мария Фалалеева
29  Meteor.users.update({ "profile.streams": "fwPYXXKAB2Ftpz2sM" }, { $set: { "profile.mentor": "n9QshxZoSGWL6zPC2" } }, { multi: 1 }); // 3-ый - Юля Аделова
30  Meteor.users.update({ "profile.streams": "5M6rnhF5EGed3qiLD" }, { $set: { "profile.mentor": "puxdzvQJrxJE96wFb" } }, { multi: 1 }); // 4-ый - Анна Соколова
31};*/
32
33export default function () {
34  // 01.06.14 merge streams
35  /*Migrations.add({
36    version: 10614,
37    name: 'merge streams',
38    up: migrateAt10614
39  });*/
40}
41
Full Screen

mergeStreams-test.js

Source: mergeStreams-test.js Github

copy
1const assert = require("assert");
2const { mergeStreams, writeData } = require("../index");
3const { createStream } = require("./testUtils");
4
5describe("mergeStreams", () => {
6  it("can merge multiple streams", (done) => {
7    let result = "";
8    let source1 = createStream(["andré"]);
9    let source2 = createStream(["bruno"]);
10
11    mergeStreams(source1, source2)
12      .pipe(writeData((data) => (result += data)))
13      .on("finish", () => {
14        assert.deepStrictEqual(result, "andrébruno");
15        done();
16      });
17  });
18
19  it("can merge multiple streams (factory)", (done) => {
20    let result = "";
21    mergeStreams(
22      () => createStream(["andré"]),
23      () => createStream(["bruno"])
24    )
25      .pipe(writeData((data) => (result += data)))
26      .on("finish", () => {
27        assert.deepStrictEqual(result, "andrébruno");
28        done();
29      });
30  });
31
32  it("can merge multiple streams (async factory)", (done) => {
33    let result = "";
34    mergeStreams(
35      () => Promise.resolve(createStream(["andré"])),
36      () => Promise.resolve(createStream(["bruno"]))
37    )
38      .pipe(writeData((data) => (result += data)))
39      .on("finish", () => {
40        assert.deepStrictEqual(result, "andrébruno");
41        done();
42      });
43  });
44
45  it("can merge multiple streams (sequentially)", (done) => {
46    let result = "";
47    let source1 = createStream(["andré"]);
48    let source2 = createStream(["bruno"]);
49
50    mergeStreams(source1, source2, { sequential: true })
51      .pipe(writeData((data) => (result += data)))
52      .on("finish", () => {
53        assert.deepStrictEqual(result, "andrébruno");
54        done();
55      });
56  });
57
58  it("can merge multiple streams (sequentially+factory)", (done) => {
59    let result = "";
60    mergeStreams(
61      () => createStream(["andré"]),
62      () => createStream(["bruno"]),
63      { sequential: true }
64    )
65      .pipe(writeData((data) => (result += data)))
66      .on("finish", () => {
67        assert.deepStrictEqual(result, "andrébruno");
68        done();
69      });
70  });
71
72  it("can iterate over a merged stream", async () => {
73    let source1 = createStream(["andré"]);
74    let source2 = createStream(["bruno"]);
75
76    let chunks = [];
77    for await (const chunk of mergeStreams(source1, source2)) {
78      chunks.push(chunk);
79    }
80
81    assert.deepStrictEqual(chunks, ["andré", "bruno"]);
82  });
83});
84
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run JavaScript Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Internal on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)