How to use stop_watching method in wpt

Best JavaScript code snippet using wpt

test_spec.js

Source:test_spec.js Github

copy

Full Screen

...126 onstart.call(r);127 });128 }129 };130 fsq.stop_watching(function ()131 {132 fsq_stopped = true;133 check();134 });135 fsq = new QlobberFSQ(Object.assign({},136 default_options,137 test_options));138 fsq.N = n;139 fsq.I = 0;140 maybe_ephemeral(fsq);141 make_buckets();142 ignore_ebusy(fsq);143 fsq.on('start', function ()144 {145 fsq_started = true;146 check();147 });148 r.on = function (evname, f)149 {150 if (evname !== 'start')151 {152 return orig_on.call(this, evname, f);153 }154 orig_on.call(this, 'start', function ()155 {156 onstart = f;157 check();158 });159 };160 }161 }162 return r;163 }164 if (getdents_size > 0)165 {166 require('../lib/process_all_getdents'); // account for file handle167 }168 if (use_disruptor)169 {170 Disruptor = require('shared-memory-disruptor').Disruptor;171 }172 beforeEach(function (cb)173 {174 if (getdents_size > 0)175 {176 test_options.getdents_size = getdents_size;177 }178 if (use_disruptor)179 {180 test_options.get_disruptor = function (bucket)181 {182 return new Disruptor('/test' + bucket,183 20 * 1024,184 2048,185 this.N,186 this.I,187 false,188 false);189 };190 }191 if ((getdents_size > 0) || use_disruptor)192 {193 return fsq.stop_watching(function ()194 {195 fsq = make_fsq(1, 0);196 if (getdents_size > 0)197 {198 fsq.on('getdents_disabled', function (err)199 {200 cb(err);201 });202 }203 ignore_ebusy(fsq);204 fsq.on('start', cb);205 });206 }207 cb();208 });209 var orig_ftruncate, orig_rename, orig_close, orig_flock;210 function restore()211 {212 fsq._fs.ftruncate = orig_ftruncate;213 fsq._fs.rename = orig_rename;214 fsq._fs.close = orig_close;215 fsq._fsext.flock = orig_flock;216 }217 beforeEach(function ()218 {219 orig_ftruncate = fsq._fs.ftruncate;220 orig_rename = fsq._fs.rename;221 orig_close = fsq._fs.close;222 orig_flock = fsq._fsext.flock;223 var busied_ftruncate = false,224 busied_rename = false,225 busied_close = false,226 busied_flock = false;227 fsq._fs.ftruncate = function (fd, size, cb)228 {229 if (busied_ftruncate)230 {231 busied_ftruncate = false;232 return orig_ftruncate.apply(this, arguments);233 }234 busied_ftruncate = true;235 cb({ code: 'EBUSY' });236 };237 fsq._fs.rename = function (src, dest, cb)238 {239 if (busied_rename)240 {241 busied_rename = false;242 return orig_rename.apply(this, arguments);243 }244 busied_rename = true;245 cb({ code: 'EBUSY' });246 };247 // fs.WriteStream calls fs.close when it ends so if we're not using248 // fs-ext then don't overwrite fs.close otherwise publish will error249 if (!single_supported)250 {251 return;252 }253 fsq._fs.close = function (fd, cb)254 {255 if (busied_close)256 {257 busied_close = false;258 return orig_close.apply(this, arguments);259 }260 busied_close = true;261 cb({ code: 'EBUSY' });262 };263 fsq._fsext.flock = function (fd, type, cb)264 {265 if (busied_flock)266 {267 busied_flock = false;268 return orig_flock.apply(this, arguments);269 }270 busied_flock = true;271 cb({ code: 'EBUSY' });272 };273 });274 afterEach(restore);275 it('should subscribe and publish to a simple topic', function (done)276 {277 var pub_info;278 fsq.subscribe('foo', function handler(data, info, cb)279 {280 expect(info.topic).to.equal('foo');281 expect(info.single).to.equal(false);282 expect(info.direct).to.equal(false);283 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);284 expect(info.fname.lastIndexOf(Buffer.from('foo').toString('hex') + '@', 0)).to.equal(0);285 expect(info.topic_path).to.equal(undefined);286 if (info.data !== undefined)287 {288 pub_info.data = info.data;289 }290 if (info.new !== undefined)291 {292 pub_info.new = info.new;293 }294 expect(info).to.eql(pub_info);295 expect(data.toString('utf8')).to.equal('bar');296 expect(cb.num_handlers).to.equal(1);297 fs.readFile(info.path, function (err, data)298 {299 if (ephemeral)300 {301 expect(err.code).to.equal('ENOENT');302 }303 else304 {305 if (err) { return done(err); }306 expect(data.toString()).to.equal('\u0000bar');307 }308 done();309 });310 });311 fsq.publish('foo', 'bar', function (err, info)312 {313 if (err) { done(err); }314 pub_info = info;315 });316 });317 it('should construct received data only once', function (done)318 {319 var the_data = { foo: 0.435, bar: 'hello' },320 called1 = false,321 called2 = false,322 received_data;323 fsq.subscribe('test', function (data, info, cb)324 {325 expect(info.topic).to.equal('test');326 expect(JSON.parse(data)).to.eql(the_data);327 if (received_data)328 {329 expect(data === received_data).to.equal(true);330 }331 else332 {333 received_data = data;334 }335 called1 = true;336 if (called1 && called2)337 {338 cb(null, done);339 }340 else341 {342 cb();343 }344 });345 fsq.subscribe('test', function (data, info, cb)346 {347 expect(info.topic).to.equal('test');348 expect(JSON.parse(data)).to.eql(the_data);349 if (received_data)350 {351 expect(data === received_data).to.equal(true);352 }353 else354 {355 received_data = data;356 }357 called2 = true;358 if (called1 && called2)359 {360 cb(null, done);361 }362 else363 {364 cb();365 }366 });367 fsq.publish('test', JSON.stringify(the_data), function (err)368 {369 if (err) { done(err); }370 });371 });372 it('should support more than 10 subscribers', function (done)373 {374 var the_data = { foo: 0.435, bar: 'hello' },375 counter = 11,376 received_data,377 a = [],378 i;379 function subscribe(cb)380 {381 fsq.subscribe('test', function (data, info, cb)382 {383 expect(info.topic).to.equal('test');384 expect(JSON.parse(data)).to.eql(the_data);385 if (received_data)386 {387 expect(data === received_data).to.equal(true);388 }389 else390 {391 received_data = data;392 }393 counter -= 1;394 if (counter === 0)395 {396 cb(null, done);397 }398 else399 {400 cb();401 }402 }, cb);403 }404 for (i = counter; i > 0; i -= 1)405 {406 a.push(subscribe);407 }408 async.parallel(a, function (err)409 {410 if (err) { return done(err); }411 fsq.publish('test', JSON.stringify(the_data), function (err)412 {413 if (err) { done(err); }414 });415 });416 });417 it('should support more than 10 subscribers with same handler', function (done)418 {419 fsq.stop_watching(function ()420 {421 var fsq2 = make_fsq(1, 0, { dedup: false }),422 the_data = { foo: 0.435, bar: 'hello' },423 counter = 11,424 received_data,425 a = [],426 i;427 ignore_ebusy(fsq2);428 function handler(data, info, cb)429 {430 expect(info.topic).to.equal('test');431 expect(JSON.parse(data)).to.eql(the_data);432 if (received_data)433 {434 expect(data === received_data).to.equal(true);435 }436 else437 {438 received_data = data;439 }440 counter -= 1;441 if (counter === 0)442 {443 cb(null, function (err)444 {445 fsq2.stop_watching(function ()446 {447 done(err);448 });449 });450 }451 else452 {453 cb();454 }455 }456 function subscribe(cb)457 {458 fsq2.subscribe('test', handler, cb);459 }460 for (i = counter; i > 0; i -= 1)461 {462 a.push(subscribe);463 }464 fsq2.on('start', function ()465 {466 async.parallel(a, function (err)467 {468 if (err) { return done(err); }469 fsq2.publish('test', JSON.stringify(the_data), function (err)470 {471 if (err) { done(err); }472 });473 });474 });475 });476 });477 it('should subscribe to wildcards', function (done)478 {479 var count = 0;480 function received()481 {482 count += 1;483 if (count === 2) { done(); }484 }485 fsq.subscribe('*', function (data, info)486 {487 expect(info.topic).to.equal('foo');488 expect(data.toString('utf8')).to.equal('bar');489 received();490 });491 fsq.subscribe('#', function (data, info)492 {493 expect(info.topic).to.equal('foo');494 expect(data.toString('utf8')).to.equal('bar');495 received();496 });497 fsq.publish('foo', 'bar', function (err)498 {499 if (err) { done(err); }500 });501 });502 it('should only call each handler once', function (done)503 {504 var handler = function (data, info)505 {506 expect(info.topic).to.equal('foo');507 expect(data.toString('utf8')).to.equal('bar');508 done();509 };510 fsq.subscribe('*', handler);511 fsq.subscribe('#', handler);512 fsq.publish('foo', 'bar', function (err)513 {514 if (err) { done(err); }515 });516 });517 it('should be able to disable handler dedup', function (done)518 {519 fsq.stop_watching(function ()520 {521 var fsq2 = make_fsq(1, 0, { dedup: false }),522 count_multi = 0,523 count_single = 0;524 ignore_ebusy(fsq2);525 function handler(data, info, cb)526 {527 expect(info.topic).to.equal('foo');528 expect(data.toString('utf8')).to.equal('bar');529 cb(null, function (err)530 {531 if (info.single)532 {533 count_single += 1;534 }535 else536 {537 count_multi += 1;538 }539 if ((count_single === (single_supported ? 1 : 0)) && 540 (count_multi === 2))541 {542 fsq2.stop_watching(function ()543 {544 done(err);545 });546 }547 else 548 {549 if ((count_single > 1) || (count_multi > 2))550 {551 throw new Error('called too many times');552 }553 }554 });555 }556 fsq2.on('start', function ()557 {558 fsq2.subscribe('*', handler);559 fsq2.subscribe('#', handler);560 fsq2.publish('foo', 'bar', function (err)561 {562 if (err) { done(err); }563 });564 fsq2.publish('foo', 'bar', { single: true }, function (err)565 {566 if (err) { done(err); }567 });568 });569 });570 });571 it('should call all handlers on a topic for pubsub', function (done)572 {573 var count = 0;574 function received()575 {576 count += 1;577 if (count === 2) { done(); }578 }579 function handler(data, info)580 {581 expect(info.topic).to.equal('foo');582 expect(data.toString('utf8')).to.equal('bar');583 received();584 }585 fsq.subscribe('foo', function () { handler.apply(this, arguments); });586 fsq.subscribe('foo', function () { handler.apply(this, arguments); });587 fsq.publish('foo', 'bar', function (err)588 {589 if (err) { done(err); }590 });591 });592 if (single_supported)593 {594 it('should support a work queue', function (done)595 {596 fsq.subscribe('foo', function (data, info, cb)597 {598 expect(info.topic).to.equal('foo');599 expect(info.single).to.equal(true);600 expect(info.direct).to.equal(false);601 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);602 expect(info.fname.lastIndexOf(Buffer.from('foo').toString('hex') + '@', 0)).to.equal(0);603 expect(data.toString('utf8')).to.equal('bar');604 fs.stat(info.path, function (err)605 {606 expect(err).to.equal(null);607 fs.open(info.path, 'r+', function (err, fd)608 {609 expect(err).to.equal(null);610 orig_flock(fd, 'exnb', function (err)611 {612 expect(err.code).to.be.oneOf(['EAGAIN', 'EWOULDBLOCK']);613 orig_close(fd, function (err)614 {615 expect(err).to.equal(null);616 cb(null, function ()617 {618 fs.stat(info.fname, function (err)619 {620 expect(err.code).to.equal('ENOENT');621 done();622 });623 });624 });625 });626 });627 });628 });629 fsq.publish('foo', 'bar', { single: true }, function (err)630 {631 if (err) { done(err); }632 });633 });634 }635 it('should guard against calling subscribe callback twice', function (done)636 {637 fsq.on('warning', function (err)638 {639 if (err && (err.code !== 'EBUSY'))640 {641 throw new Error('should not be called');642 }643 });644 fsq.subscribe('foo', function (data, info, cb)645 {646 expect(info.single).to.equal(single_supported);647 cb(null, function (err)648 {649 if (err) { return done(err); }650 setTimeout(function ()651 {652 cb(null, done);653 }, 2000);654 });655 }, function (err)656 {657 if (err) { return done(err); }658 fsq.publish('foo', 'bar', { single: single_supported }, function (err)659 {660 if (err) { done(err); }661 });662 });663 });664 if (single_supported)665 {666 it('should only give work to one worker', function (done)667 {668 this.timeout(30000);669 var fsq2 = make_fsq(2, 1),670 called = false;671 ignore_ebusy(fsq2);672 function handler (data, info, cb)673 {674 expect(called).to.equal(false);675 called = true;676 expect(info.topic).to.equal('foo');677 expect(info.single).to.equal(true);678 expect(data.toString('utf8')).to.equal('bar');679 setTimeout(function ()680 {681 cb(null, function (err)682 {683 fsq2.stop_watching(function ()684 {685 done(err);686 });687 });688 }, 2000);689 }690 fsq.subscribe('foo', function () { handler.apply(this, arguments); });691 fsq.subscribe('foo', function () { handler.apply(this, arguments); });692 693 fsq2.subscribe('foo', function () { handler.apply(this, arguments); });694 fsq2.subscribe('foo', function () { handler.apply(this, arguments); });695 fsq2.on('start', function ()696 {697 fsq.publish('foo', 'bar', { single: true }, function (err)698 {699 if (err) { done(err); }700 });701 });702 });703 it('should put work back on the queue', function (done)704 {705 var count = 0;706 fsq.subscribe('foo', function (data, info, cb)707 {708 count += 1;709 if (count === 1)710 {711 cb('dummy failure');712 }713 else714 {715 cb(null, done);716 }717 });718 fsq.publish('foo', 'bar', { single: true }, function (err)719 {720 if (err) { done(err); }721 });722 });723 }724 it('should allow handlers to refuse work', function (done)725 {726 fsq.stop_watching(function ()727 {728 function handler1()729 {730 throw new Error('should not be called');731 }732 var fsq2; 733 function handler2(data, info, cb)734 {735 cb(null, function (err)736 {737 fsq2.stop_watching(function ()738 {739 done(err);740 });741 });742 }743 fsq2 = make_fsq(1, 0,744 {745 filter: function (info, handlers, cb)746 {747 expect(info.topic).to.equal('foo');748 handlers.delete(handler1);749 cb(null, true, handlers);750 }751 });752 ignore_ebusy(fsq2);753 fsq2.subscribe('foo', handler1);754 fsq2.subscribe('foo', handler2);755 fsq2.on('start', function ()756 {757 fsq2.publish('foo', 'bar', function (err)758 {759 if (err) { done(err); }760 });761 });762 });763 });764 it('should not allow filters to modify qlobber matches', function (done)765 {766 fsq.stop_watching(function ()767 {768 var fsq2, count = 0;769 function handler(data, info, cb)770 {771 expect(count).to.equal(2);772 cb(null, function (err)773 {774 fsq2.stop_watching(function ()775 {776 done(err);777 });778 });779 }780 fsq2 = make_fsq(1, 0,781 {782 filter: function (info, handlers, cb)783 {784 expect(info.topic).to.equal('foo');785 if (++count === 1)786 {787 handlers.delete(handler);788 return cb(null, false);789 }790 cb(null, true, handlers);791 }792 });793 ignore_ebusy(fsq2);794 fsq2.subscribe('foo', handler);795 fsq2.on('start', function ()796 {797 fsq2.publish('foo', 'bar', function (err)798 {799 if (err) { done(err); }800 });801 });802 });803 });804 it('should be able to set filter by property', function (done)805 {806 function handler1()807 {808 throw new Error('should not be called');809 }810 function handler2(data, info, cb)811 {812 cb(null, done);813 }814 fsq.filters.push(function (info, handlers, cb)815 {816 expect(info.topic).to.equal('foo');817 handlers.delete(handler1);818 cb(null, true, handlers);819 });820 fsq.subscribe('foo', handler1);821 fsq.subscribe('foo', handler2);822 fsq.publish('foo', 'bar', function (err)823 {824 if (err) { done(err); }825 });826 });827 it('should be able to pass filtered handlers as iterator (Set)', function (done)828 {829 function handler1()830 {831 throw new Error('should not be called');832 }833 function handler2(data, info, cb)834 {835 cb(null, done);836 }837 fsq.filters.push(function (info, handlers, cb)838 {839 expect(info.topic).to.equal('foo');840 cb(null, true, wu(handlers).filter(function (h)841 {842 return h !== handler1;843 }));844 });845 fsq.subscribe('foo', handler1);846 fsq.subscribe('foo', handler2);847 fsq.publish('foo', 'bar', function (err)848 {849 if (err) { done(err); }850 });851 });852 it('should be able to pass filtered handlers as iterator (Array)', function (done)853 {854 fsq.stop_watching(function ()855 {856 function handler1()857 {858 throw new Error('should not be called');859 }860 var fsq2; 861 function handler2(data, info, cb)862 {863 cb(null, function (err)864 {865 fsq2.stop_watching(function ()866 {867 done(err);868 });869 });870 }871 fsq2 = make_fsq(1, 0,872 {873 filter: function (info, handlers, cb)874 {875 expect(info.topic).to.equal('foo');876 cb(null, true, wu(handlers).filter(function (h)877 {878 return h !== handler1;879 }));880 },881 dedup: false882 });883 ignore_ebusy(fsq2);884 fsq2.subscribe('foo', handler1);885 fsq2.subscribe('foo', handler2);886 fsq2.on('start', function ()887 {888 fsq2.publish('foo', 'bar', function (err)889 {890 if (err) { done(err); }891 });892 });893 });894 });895 it('should support multiple filters', function (done)896 {897 function handler1()898 {899 throw new Error('should not be called');900 }901 function handler2()902 {903 throw new Error('should not be called');904 }905 function handler3(data, info, cb)906 {907 cb(null, done);908 }909 fsq.filters.push(910 function (info, handlers, cb)911 {912 expect(info.topic).to.equal('foo');913 handlers.delete(handler1);914 cb(null, true, handlers);915 },916 function (info, handlers, cb)917 {918 expect(info.topic).to.equal('foo');919 handlers.delete(handler2);920 cb(null, true, handlers);921 }922 );923 fsq.subscribe('foo', handler1);924 fsq.subscribe('foo', handler2);925 fsq.subscribe('foo', handler3);926 fsq.publish('foo', 'bar', function (err)927 {928 if (err) { done(err); }929 });930 });931 it('should not call other filters if error', function (done)932 {933 var called = false;934 fsq.filters.push(935 function (info, handlers, cb)936 {937 expect(info.topic).to.equal('foo');938 cb(new Error('dummy'));939 if (called)940 {941 return done();942 }943 called = true;944 },945 function (unused_info, unused_handlers, unused_cb)946 {947 throw new Error('should not be called');948 }949 );950 fsq.publish('foo', 'bar', function (err)951 {952 if (err) { done(err); }953 });954 });955 it('should not call other filters if not ready', function (done)956 {957 var called = false;958 fsq.filters.push(959 function (info, handlers, cb)960 {961 expect(info.topic).to.equal('foo');962 cb(null, false);963 if (called)964 {965 return done();966 }967 called = true;968 },969 function (unused_info, unused_handlers, unused_cb)970 {971 throw new Error('should not be called');972 }973 );974 fsq.publish('foo', 'bar', function (err)975 {976 if (err) { done(err); }977 });978 });979 if (single_supported)980 {981 it('should put work back on queue for another handler', function (done)982 {983 fsq.stop_watching(function ()984 {985 function handler(data, info, cb)986 {987 cb('dummy failure');988 }989 var filter_called = false,990 fsq2 = make_fsq(1, 0,991 {992 filter: function (info, handlers, cb)993 {994 expect(info.topic).to.equal('foo');995 expect(info.single).to.equal(true);996 if (filter_called)997 {998 handlers.delete(handler);999 return cb(null, true, handlers);1000 }1001 filter_called = true;1002 cb(null, true, handlers);1003 }1004 });1005 ignore_ebusy(fsq2);1006 fsq2.subscribe('foo', handler);1007 fsq2.subscribe('foo', function (data, info, cb)1008 {1009 if (filter_called)1010 {1011 cb(null, function (err)1012 {1013 fsq2.stop_watching(function ()1014 {1015 done(err);1016 });1017 });1018 }1019 else1020 {1021 cb('dummy failure2');1022 }1023 });1024 fsq2.on('start', function ()1025 {1026 fsq2.publish('foo', 'bar', { single: true }, function (err)1027 {1028 if (err) { done(err); }1029 });1030 });1031 });1032 });1033 it('should put work back on queue for a handler on another queue', function (done)1034 {1035 this.timeout(30000);1036 fsq.stop_watching(function ()1037 {1038 function handler(data, info, cb)1039 {1040 cb('dummy failure');1041 }1042 var filter_called = false,1043 fsq2 = make_fsq(2, 0,1044 {1045 filter: function (info, handlers, cb)1046 {1047 expect(info.topic).to.equal('foo');1048 expect(info.single).to.equal(true);1049 if (filter_called)1050 {1051 handlers.delete(handler);1052 }1053 filter_called = true;1054 cb(null, true, handlers);1055 }1056 }),1057 fsq3 = make_fsq(2, 1),1058 started2 = false,1059 started3 = false;1060 ignore_ebusy(fsq2);1061 ignore_ebusy(fsq3);1062 fsq2.subscribe('foo', handler);1063 fsq3.subscribe('foo', function (data, info, cb)1064 {1065 if (filter_called)1066 {1067 cb(null, function (err)1068 {1069 fsq2.stop_watching(function ()1070 {1071 fsq3.stop_watching(function ()1072 {1073 done(err);1074 });1075 });1076 });1077 }1078 else1079 {1080 cb('dummy failure2');1081 }1082 });1083 function start()1084 {1085 if (!(started2 && started3)) { return; }1086 fsq2.publish('foo', 'bar', { single: true }, function (err)1087 {1088 if (err) { done(err); }1089 });1090 }1091 fsq2.on('start', function ()1092 {1093 started2 = true;1094 start();1095 });1096 fsq3.on('start', function ()1097 {1098 started3 = true;1099 start();1100 });1101 });1102 });1103 }1104 if (!ephemeral)1105 {1106 it('should allow handlers to delay a message', function (done)1107 {1108 restore();1109 fsq.stop_watching(function ()1110 {1111 var ready_multi = false,1112 ready_single = !single_supported,1113 got_multi = false,1114 got_single = !single_supported,1115 count = 0,1116 fsq2 = make_fsq(1, 0,1117 {1118 filter: function (info, handlers, cb)1119 {1120 expect(info.topic).to.equal('foo');1121 if (info.single)1122 {1123 ready_single = true;1124 }1125 else1126 {1127 ready_multi = true;1128 }1129 if (!single_supported)1130 {1131 expect(info.single).to.equal(false);1132 }1133 count += 1;1134 cb(null, (count % 5) === 0, handlers);1135 }1136 });1137 ignore_ebusy(fsq2);1138 function handler(data, info, cb)1139 {1140 expect(data.toString('utf8')).to.equal('bar');1141 cb(null, function (err)1142 {1143 if (info.single)1144 {1145 expect(got_single).to.equal(false);1146 got_single = true;1147 }1148 else1149 {1150 expect(got_multi).to.equal(false);1151 got_multi = true;1152 }1153 if (!single_supported)1154 {1155 expect(info.single).to.equal(false);1156 }1157 if (got_single && got_multi && ready_single && ready_multi)1158 {1159 expect(count).to.equal(single_supported ? 10 : 5);1160 fsq2.stop_watching(function ()1161 {1162 done(err);1163 });1164 }1165 });1166 }1167 fsq2.subscribe('foo', handler);1168 fsq2.on('start', function ()1169 {1170 fsq2.publish('foo', 'bar', function (err)1171 {1172 if (err) { done(err); }1173 });1174 fsq2.publish('foo', 'bar', { single: true }, function (err)1175 {1176 if (err) { done(err); }1177 });1178 });1179 });1180 });1181 }1182 it('should emit start and stop events', function (done)1183 {1184 this.timeout(30000);1185 var fsq2 = make_fsq(2, 1);1186 ignore_ebusy(fsq2);1187 fsq2.on('start', function ()1188 {1189 fsq2.stop_watching();1190 fsq2.on('stop', done);1191 });1192 });1193 it('should support per-message time-to-live', function (done)1194 {1195 this.timeout(20000);1196 restore();1197 fsq.subscribe('foo', function ()1198 {1199 setTimeout(function ()1200 {1201 fsq.force_refresh();1202 setTimeout(function ()1203 {1204 check_empty(msg_dir, done, done);1205 }, 500);1206 }, 500);1207 });1208 fsq.publish('foo', 'bar', { ttl: 500 }, function (err)1209 {1210 if (err) { done(err); }1211 });1212 });1213 it('should call error function', function (done)1214 {1215 restore();1216 fsq.on('warning', function (err)1217 {1218 expect(err).to.equal('dummy failure');1219 done();1220 });1221 fsq.subscribe('foo', function (data, info, cb)1222 {1223 cb('dummy failure');1224 });1225 fsq.publish('foo', 'bar', { single : single_supported }, function (err)1226 {1227 if (err) { done(err); }1228 });1229 });1230 if (single_supported)1231 {1232 it('should support custom polling interval', function (done)1233 {1234 this.timeout(30000);1235 restore();1236 var time, count = 0, fsq2 = make_fsq(2, 1, { poll_interval: 50 });1237 ignore_ebusy(fsq2);1238 fsq2.subscribe('foo', function (data, info, cb)1239 {1240 count += 1;1241 var time2 = new Date().getTime();1242 expect(time2 - time).to.be.below(900);1243 time = time2;1244 if (count === 10)1245 {1246 cb(null, function ()1247 {1248 fsq2.stop_watching(done);1249 });1250 }1251 else1252 {1253 cb('dummy failure');1254 }1255 });1256 1257 fsq2.on('start', function ()1258 {1259 time = new Date().getTime();1260 fsq.publish('foo', 'bar', {single : true}, function (err)1261 {1262 if (err) { done(err); }1263 });1264 });1265 });1266 }1267 it('should support unsubscribing', function (done)1268 {1269 this.timeout(5000);1270 var count = 0;1271 function handler(data, info, cb)1272 {1273 count += 1;1274 if (count > 1)1275 {1276 throw new Error('should not be called');1277 }1278 fsq.unsubscribe('foo', handler, function ()1279 {1280 fsq.publish('foo', 'bar', function (err)1281 {1282 if (err) { done(err); }1283 });1284 setTimeout(function ()1285 {1286 cb(null, done);1287 }, 2000);1288 });1289 }1290 fsq.subscribe('foo', handler);1291 1292 fsq.publish('foo', 'bar', function (err)1293 {1294 if (err) { done(err); }1295 });1296 });1297 it('should support unsubscribing to all handlers for a topic', function (done)1298 {1299 this.timeout(5000);1300 var count = 0;1301 function handler(data, info, cb)1302 {1303 count += 1;1304 if (count > 2)1305 {1306 throw new Error('should not be called');1307 }1308 if (count === 2)1309 {1310 fsq.unsubscribe('foo', undefined, function ()1311 {1312 fsq.publish('foo', 'bar', function (err)1313 {1314 if (err) { done(err); }1315 });1316 setTimeout(function ()1317 {1318 cb(null, done);1319 }, 2000);1320 });1321 }1322 }1323 fsq.subscribe('foo', function (data, info, cb)1324 {1325 handler(data, info, cb);1326 });1327 fsq.subscribe('foo', function (data, info, cb)1328 {1329 handler(data, info, cb);1330 });1331 1332 fsq.publish('foo', 'bar', function (err)1333 {1334 if (err) { done(err); }1335 });1336 });1337 it('should support unsubscribing to all handlers', function (done)1338 {1339 this.timeout(5000);1340 var count = 0;1341 function handler(data, info, cb)1342 {1343 count += 1;1344 if (count > 2)1345 {1346 throw new Error('should not be called');1347 }1348 if (count === 2)1349 {1350 fsq.subscribe('foo2', function ()1351 {1352 throw new Error('should not be called');1353 });1354 fsq.unsubscribe(function ()1355 {1356 fsq.publish('foo', 'bar', function (err)1357 {1358 if (err) { done(err); }1359 });1360 fsq.publish('foo2', 'bar2', function (err)1361 {1362 if (err) { done(err); }1363 });1364 setTimeout(function ()1365 {1366 cb(null, done);1367 }, 2000);1368 });1369 }1370 }1371 fsq.subscribe('foo', function (data, info, cb)1372 {1373 handler(data, info, cb);1374 });1375 fsq.subscribe('foo', function (data, info, cb)1376 {1377 handler(data, info, cb);1378 });1379 1380 fsq.publish('foo', 'bar', function (err)1381 {1382 if (err) { done(err); }1383 });1384 });1385 it('should support changing the default time-to-live', function (done)1386 {1387 this.timeout(30000);1388 restore();1389 fsq.stop_watching(function () // stop fsq dequeuing1390 {1391 var got_single = !single_supported,1392 got_multi = false,1393 fsq2 = make_fsq(1, 0,1394 {1395 multi_ttl: 1000,1396 single_ttl: 10001397 });1398 ignore_ebusy(fsq2);1399 fsq2.subscribe('foo', function (data, info, cb)1400 {1401 cb(null, function ()1402 {1403 if (info.single)1404 {1405 got_single = true;1406 }1407 else1408 {1409 got_multi = true;1410 }1411 if (got_single && got_multi)1412 {1413 setTimeout(function ()1414 {1415 fsq2.force_refresh();1416 setTimeout(function ()1417 {1418 check_empty(msg_dir, done, function ()1419 {1420 fsq2.stop_watching(done);1421 });1422 }, 1000);1423 }, 1000);1424 }1425 });1426 });1427 fsq2.on('start', function ()1428 {1429 fsq2.publish('foo', 'bar', function (err)1430 {1431 if (err) { done(err); }1432 });1433 if (single_supported)1434 {1435 fsq2.publish('foo', 'bar', { single: true }, function (err)1436 {1437 if (err) { done(err); }1438 });1439 }1440 });1441 });1442 });1443 it('should publish and receive twice', function (done)1444 {1445 var count_multi = 0,1446 count_single = single_supported ? 0 : 2;1447 fsq.subscribe('foo', function (data, info, cb)1448 {1449 cb(null, function ()1450 {1451 if (info.single)1452 {1453 count_single += 1;1454 }1455 else1456 {1457 count_multi += 1;1458 }1459 if ((count_single === 2) && (count_multi === 2))1460 {1461 done();1462 }1463 else if ((count_single > 2) || (count_multi > 2))1464 {1465 throw new Error('called too many times');1466 }1467 });1468 });1469 async.timesSeries(2, function (n, cb)1470 {1471 async.eachSeries([true, false], function (single, cb)1472 {1473 fsq.publish('foo', 'bar', { single: single }, function (err)1474 {1475 cb(err);1476 });1477 }, cb);1478 }, function (err)1479 {1480 if (err) { done(err); }1481 });1482 });1483 it('should default to putting messages in module directory', function (done)1484 {1485 var fsq2 = make_fsq(2, 1, { fsq_dir: undefined });1486 ignore_ebusy(fsq2);1487 fsq2.subscribe('foo', function (data, info)1488 {1489 if (use_disruptor)1490 {1491 // fsq and fsq2 use same disruptors1492 expect(info.new).to.be.true;1493 expect(info.data.toString('utf8')).to.equal('bar');1494 }1495 else1496 {1497 throw new Error('should not be called');1498 }1499 });1500 fsq2.subscribe('foo2', function (data, info, cb)1501 {1502 expect(data.toString('utf8')).to.equal('bar2');1503 expect(info.path.lastIndexOf(path.join(__dirname, '..', 'fsq', 'messages'), 0)).to.equal(0);1504 cb(null, function ()1505 {1506 fsq2.stop_watching(done);1507 });1508 });1509 fsq2.on('start', function ()1510 {1511 fsq.publish('foo', 'bar', function (err)1512 {1513 if (err) { done(err); }1514 // wait for publish so EBUSY isn't retrying while fsq is being cleaned up1515 fsq2.publish('foo2', 'bar2', function (err)1516 {1517 if (err) { done(err); }1518 });1519 });1520 });1521 });1522 it('should publish and subscribe to messages with long topics (multi)', function (done)1523 {1524 var arr = [], topic;1525 arr.length = 64 * 1024 + 1;1526 topic = arr.join('a');1527 fsq.subscribe(topic, function (data, info)1528 {1529 expect(info.topic).to.equal(topic);1530 expect(info.single).to.equal(false);1531 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);1532 expect(info.fname.lastIndexOf(Buffer.from(topic).toString('hex').substr(0, fsq._split_topic_at) + '@', 0)).to.equal(0);1533 expect(data.toString('utf8')).to.equal('bar');1534 var topic_dir = path.dirname(path.dirname(info.topic_path));1535 expect(topic_dir).to.equal(path.join(msg_dir, '..', 'topics'));1536 fs.readFile(info.topic_path, function (err, split)1537 {1538 if (err) { return done(err); }1539 expect(split.toString('utf8')).to.equal(Buffer.from(topic).toString('hex').substr(fsq._split_topic_at));1540 setTimeout(function ()1541 {1542 fsq.force_refresh();1543 setTimeout(function ()1544 {1545 check_empty(msg_dir, done, function ()1546 {1547 check_empty(topic_dir, done, done);1548 });1549 }, 500);1550 }, 1000);1551 });1552 });1553 fsq.publish(topic, 'bar', { ttl: 1000 }, function (err)1554 {1555 if (ephemeral)1556 {1557 expect(err.code).to.equal('buffer-too-small');1558 return done();1559 }1560 if (err) { done(err); }1561 });1562 });1563 if (single_supported)1564 {1565 it('should publish and subscribe to messages with long topics (single)', function (done)1566 {1567 var arr = [], topic;1568 arr.length = 64 * 1024 + 1;1569 topic = arr.join('a');1570 fsq.subscribe(topic, function (data, info, cb)1571 {1572 expect(info.topic).to.equal(topic);1573 expect(info.single).to.equal(true);1574 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);1575 expect(info.fname.lastIndexOf(Buffer.from(topic).toString('hex').substr(0, fsq._split_topic_at) + '@', 0)).to.equal(0);1576 expect(data.toString('utf8')).to.equal('bar');1577 var topic_dir = path.dirname(path.dirname(info.topic_path));1578 expect(topic_dir).to.equal(path.join(msg_dir, '..', 'topics'));1579 fs.readFile(info.topic_path, function (err, split)1580 {1581 if (err) { return done(err); }1582 expect(split.toString('utf8')).to.equal(Buffer.from(topic).toString('hex').substr(fsq._split_topic_at));1583 cb(null, function ()1584 {1585 setTimeout(function ()1586 {1587 fsq.force_refresh();1588 setTimeout(function ()1589 {1590 check_empty(msg_dir, done, function ()1591 {1592 check_empty(topic_dir, done, done);1593 });1594 }, 500);1595 }, 1000);1596 });1597 });1598 });1599 fsq.publish(topic, 'bar', { ttl: 1000, single: true }, function (err)1600 {1601 if (err) { done(err); }1602 });1603 });1604 }1605 it('should be able to change when a topic file is created', function (done)1606 {1607 fsq.stop_watching(function ()1608 {1609 var topic = 'hellofromfsq',1610 fsq2 = make_fsq(1, 0, { split_topic_at: 5, });1611 ignore_ebusy(fsq2);1612 fsq2.on('start', function ()1613 {1614 fsq2.subscribe(topic, function (data, info)1615 {1616 expect(info.topic).to.equal(topic);1617 expect(info.single).to.equal(false);1618 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);1619 expect(info.fname.lastIndexOf(Buffer.from(topic).toString('hex').substr(0, ephemeral ? undefined : 5) + '@', 0)).to.equal(0);1620 expect(data.toString('utf8')).to.equal('bar');1621 if (ephemeral)1622 {1623 expect(info.topic_path).to.be.undefined;1624 return check_empty(msg_dir, done, function ()1625 {1626 fsq2.stop_watching(done);1627 });1628 }1629 var topic_dir = path.dirname(path.dirname(info.topic_path));1630 expect(topic_dir).to.equal(path.join(msg_dir, '..', 'topics'));1631 fs.readFile(info.topic_path, function (err, split)1632 {1633 if (err) { return done(err); }1634 expect(split.toString('utf8')).to.equal(Buffer.from(topic).toString('hex').substr(5));1635 setTimeout(function ()1636 {1637 fsq2.force_refresh();1638 setTimeout(function ()1639 {1640 fsq2.stop_watching(function ()1641 {1642 check_empty(msg_dir, done, function ()1643 {1644 check_empty(topic_dir, done, done);1645 });1646 });1647 }, 5 * 1000);1648 }, 1000);1649 });1650 });1651 fsq2.publish(topic, 'bar', { ttl: 1000 }, function (err)1652 {1653 if (err) { done(err); }1654 });1655 });1656 });1657 });1658 it('should not read multi-worker messages which already exist', function (done)1659 {1660 this.timeout (10 * 1000);1661 fsq.stop_watching(function ()1662 {1663 fsq.publish('foo', 'bar', function (err)1664 {1665 if (err) { return done(err); }1666 var fsq2 = make_fsq(1, 0);1667 ignore_ebusy(fsq2);1668 fsq2.subscribe('foo', function ()1669 {1670 done('should not be called');1671 });1672 fsq2.on('start', function ()1673 {1674 setTimeout(function ()1675 {1676 fsq2.stop_watching(done);1677 }, 5 * 1000);1678 });1679 });1680 });1681 });1682 if (single_supported)1683 {1684 it('should read single worker messages which already exist', function (done)1685 {1686 fsq.stop_watching(function ()1687 {1688 fsq.publish('foo', 'bar', { single: true }, function (err)1689 {1690 if (err) { return done(err); }1691 var fsq2 = make_fsq(1, 0);1692 ignore_ebusy(fsq2);1693 fsq2.subscribe('foo', function (data, info, cb)1694 {1695 cb(null, function ()1696 {1697 fsq2.stop_watching(done);1698 });1699 });1700 });1701 });1702 });1703 it('should read single worker messages which already exist after subscribing', function (done)1704 {1705 fsq.stop_watching(function ()1706 {1707 fsq.publish('foo', 'bar', { single: true }, function (err)1708 {1709 if (err) { return done(err); }1710 var fsq2 = make_fsq(1, 0);1711 ignore_ebusy(fsq2);1712 fsq2.on('start', function ()1713 {1714 fsq2.subscribe('foo', function (data, info, cb)1715 {1716 cb(null, function ()1717 {1718 fsq2.stop_watching(done);1719 });1720 });1721 });1722 });1723 });1724 });1725 }1726 it('should support streaming interfaces', function (done)1727 {1728 var stream_multi,1729 stream_single,1730 stream_file,1731 sub_multi_called = false,1732 sub_single_called = !single_supported,1733 pub_multi_called = false,1734 pub_single_called = false;1735 function handler(stream, info, cb)1736 {1737 var hash = crypto.createHash('sha256'),1738 len = 0;1739 stream.on('readable', function ()1740 {1741 let chunk;1742 while (chunk = stream.read()) // eslint-disable-line no-cond-assign1743 {1744 len += chunk.length;1745 hash.update(chunk);1746 }1747 });1748 stream.on('end', function ()1749 {1750 expect(len).to.equal(1024 * 1024);1751 expect(hash.digest('hex')).to.equal('268e1a23a9da868b62b12e020061c98449568c4af9cf9070c8738fe1b457ed9c');1752 cb(null, function ()1753 {1754 if (info.single)1755 {1756 expect(sub_single_called).to.equal(false);1757 sub_single_called = true;1758 }1759 else1760 {1761 expect(sub_multi_called).to.equal(false);1762 sub_multi_called = true;1763 }1764 if (pub_multi_called && pub_single_called &&1765 sub_multi_called && sub_single_called)1766 {1767 done();1768 }1769 });1770 });1771 }1772 handler.accept_stream = true;1773 1774 fsq.subscribe('foo', handler);1775 1776 function published(err)1777 {1778 if (err) { return done(err); }1779 if (pub_multi_called && pub_single_called &&1780 sub_multi_called && sub_single_called)1781 {1782 done();1783 }1784 }1785 stream_multi = fsq.publish('foo', function (err)1786 {1787 expect(pub_multi_called).to.equal(false);1788 pub_multi_called = true;1789 published(err);1790 });1791 stream_single = fsq.publish('foo', { single: true }, function (err)1792 {1793 expect(pub_single_called).to.equal(false);1794 pub_single_called = true;1795 published(err);1796 });1797 stream_file = fs.createReadStream(path.join(__dirname, 'fixtures', 'random'));1798 stream_file.pipe(stream_multi);1799 stream_file.pipe(stream_single);1800 });1801 it('should support direct streaming', function (done)1802 {1803 var stream_mod = require('stream');1804 fsq.stop_watching(function ()1805 {1806 var fsq2 = make_fsq(1, 0, { direct_handler: new class1807 {1808 constructor()1809 {1810 this.streams = new Map();1811 this.gsfp_called = false;1812 this.gsfs_called = false;1813 this.psd_called = false;1814 this.pse_called = false;1815 this.ssd_called = false;1816 }1817 get_stream_for_publish(filename, direct)1818 {1819 this.gsfp_called = true;1820 expect(direct).to.equal('something truthy');1821 const r = new stream_mod.PassThrough();1822 this.streams.set(filename, r);1823 return r;1824 }1825 get_stream_for_subscribers(filename)1826 {1827 this.gsfs_called = true;1828 return this.streams.get(filename);1829 }1830 1831 publish_stream_destroyed(unused_filename, unused_stream)1832 {1833 this.psd_called = true;1834 }1835 publish_stream_expired(unused_filename)1836 {1837 this.pse_called = true;1838 }1839 subscriber_stream_destroyed(filename, stream)1840 {1841 this.ssd_called = true;1842 const s = this.streams.get(filename);1843 expect(s).to.equal(stream);1844 this.streams.delete(filename);1845 }1846 subscriber_stream_ignored(unused_filename)1847 {1848 throw new Error('should not be called');1849 }1850 }()});1851 fsq2.on('start', function ()1852 {1853 var stream_direct,1854 stream_file,1855 sub_called = false,1856 pub_called = false;1857 function check()1858 {1859 if (pub_called && sub_called)1860 {1861 expect(fsq2._direct_handler.gsfp_called).to.be.true;1862 expect(fsq2._direct_handler.gsfs_called).to.be.true;1863 expect(fsq2._direct_handler.psd_called).to.be.false;1864 expect(fsq2._direct_handler.pse_called).to.be.false;1865 expect(fsq2._direct_handler.ssd_called).to.be.true;1866 fsq2.stop_watching(done);1867 }1868 }1869 function handler(stream, info, cb)1870 {1871 var hash = crypto.createHash('sha256'),1872 len = 0;1873 expect(info.single).to.equal(false);1874 expect(info.direct).to.equal(true);1875 stream.on('readable', function ()1876 {1877 let chunk;1878 while (chunk = stream.read()) // eslint-disable-line no-cond-assign1879 {1880 len += chunk.length;1881 hash.update(chunk);1882 }1883 });1884 stream.on('end', function ()1885 {1886 expect(len).to.equal(1024 * 1024);1887 expect(hash.digest('hex')).to.equal('268e1a23a9da868b62b12e020061c98449568c4af9cf9070c8738fe1b457ed9c');1888 cb(null, function ()1889 {1890 expect(sub_called).to.equal(false);1891 sub_called = true;1892 check();1893 });1894 });1895 }1896 handler.accept_stream = true;1897 1898 fsq2.subscribe('foo', handler);1899 1900 stream_direct = fsq2.publish('foo', { direct: 'something truthy' }, function (err)1901 {1902 if (err) { return done(err); }1903 expect(pub_called).to.equal(false);1904 pub_called = true;1905 check();1906 });1907 stream_file = fs.createReadStream(path.join(__dirname, 'fixtures', 'random'));1908 stream_file.pipe(stream_direct);1909 });1910 });1911 });1912 it('should support direct streaming with readable', function (done)1913 {1914 var stream_mod = require('stream');1915 fsq.stop_watching(function ()1916 {1917 var fsq2 = make_fsq(1, 0, { direct_handler: new class1918 {1919 constructor()1920 {1921 this.streams = new Map();1922 this.gsfp_called = false;1923 this.gsfs_called = false;1924 this.psd_called = false;1925 this.pse_called = false;1926 this.ssd_called = false;1927 }1928 get_stream_for_publish(filename, direct)1929 {1930 this.gsfp_called = true;1931 expect(direct).to.be.an.instanceof(stream_mod.Readable);1932 this.streams.set(filename, direct);1933 return direct;1934 }1935 get_stream_for_subscribers(filename)1936 {1937 this.gsfs_called = true;1938 return this.streams.get(filename);1939 }1940 publish_stream_destroyed(unused_filename, unused_stream)1941 {1942 this.psd_called = true;1943 }1944 publish_stream_expired(unused_filename)1945 {1946 this.pse_called = true;1947 }1948 subscriber_stream_destroyed(filename, stream)1949 {1950 this.ssd_called = true;1951 const s = this.streams.get(filename);1952 expect(s).to.equal(stream);1953 this.streams.delete(filename);1954 }1955 }()});1956 fsq2.on('start', function ()1957 {1958 var stream_direct,1959 stream_file,1960 sub_called = false,1961 pub_called = false;1962 function check()1963 {1964 if (pub_called && sub_called)1965 {1966 expect(fsq2._direct_handler.gsfp_called).to.be.true;1967 expect(fsq2._direct_handler.gsfs_called).to.be.true;1968 expect(fsq2._direct_handler.psd_called).to.be.false;1969 expect(fsq2._direct_handler.pse_called).to.be.false;1970 expect(fsq2._direct_handler.ssd_called).to.be.true;1971 fsq2.stop_watching(done);1972 }1973 }1974 function handler(stream, info, cb)1975 {1976 var hash = crypto.createHash('sha256'),1977 len = 0;1978 expect(stream).to.equal(stream_file);1979 expect(info.single).to.equal(false);1980 expect(info.direct).to.equal(true);1981 stream.on('readable', function ()1982 {1983 let chunk;1984 while (chunk = stream.read()) // eslint-disable-line no-cond-assign1985 {1986 len += chunk.length;1987 hash.update(chunk);1988 }1989 });1990 stream.on('end', function ()1991 {1992 expect(len).to.equal(1024 * 1024);1993 expect(hash.digest('hex')).to.equal('268e1a23a9da868b62b12e020061c98449568c4af9cf9070c8738fe1b457ed9c');1994 cb(null, function ()1995 {1996 expect(sub_called).to.equal(false);1997 sub_called = true;1998 check();1999 });2000 });2001 }2002 handler.accept_stream = true;2003 2004 fsq2.subscribe('foo', handler);2005 2006 stream_file = fs.createReadStream(path.join(__dirname, 'fixtures', 'random'));2007 stream_direct = fsq2.publish('foo', { direct: stream_file, single: true }, function (err)2008 {2009 if (err) { return done(err); }2010 expect(pub_called).to.equal(false);2011 pub_called = true;2012 check();2013 });2014 expect(stream_direct).to.equal(stream_file);2015 });2016 });2017 });2018 it('should pipe to more than one stream', function (done)2019 {2020 var stream_mod = require('stream'),2021 done1 = false,2022 done2 = false;2023 function CheckStream()2024 {2025 stream_mod.Writable.call(this);2026 this._hash = crypto.createHash('sha256');2027 this._len = 0;2028 var ths = this;2029 this.on('finish', function ()2030 {2031 ths.emit('done',2032 {2033 digest: ths._hash.digest('hex'),2034 len: ths._len2035 });2036 });2037 }2038 util.inherits(CheckStream, stream_mod.Writable);2039 CheckStream.prototype._write = function (chunk, encoding, callback)2040 {2041 this._len += chunk.length;2042 this._hash.update(chunk);2043 callback();2044 };2045 function check(obj, cb)2046 {2047 expect(obj.len).to.equal(1024 * 1024);2048 expect(obj.digest).to.equal('268e1a23a9da868b62b12e020061c98449568c4af9cf9070c8738fe1b457ed9c');2049 if (done1 && done2)2050 {2051 cb(null, done);2052 }2053 else2054 {2055 cb();2056 }2057 }2058 function handler1(stream, info, cb)2059 {2060 var cs = new CheckStream();2061 cs.on('done', function (obj)2062 {2063 done1 = true;2064 check(obj, cb);2065 });2066 stream.pipe(cs);2067 }2068 function handler2(stream, info, cb)2069 {2070 var cs = new CheckStream();2071 cs.on('done', function (obj)2072 {2073 done2 = true;2074 check(obj, cb);2075 });2076 stream.pipe(cs);2077 }2078 handler1.accept_stream = true;2079 fsq.subscribe('foo', handler1);2080 handler2.accept_stream = true;2081 fsq.subscribe('foo', handler2);2082 fs.createReadStream(path.join(__dirname, 'fixtures', 'random')).pipe(2083 fsq.publish('foo', function (err)2084 {2085 if (err) { return done(err); }2086 }));2087 });2088 it('should not call the same handler with stream and data', function (done)2089 {2090 function handler(stream, info, cb)2091 {2092 expect(Buffer.isBuffer(stream)).to.equal(false);2093 var hash = crypto.createHash('sha256'),2094 len = 0;2095 stream.on('readable', function ()2096 {2097 let chunk;2098 while (chunk = stream.read()) // eslint-disable-line no-cond-assign2099 {2100 len += chunk.length;2101 hash.update(chunk);2102 }2103 });2104 stream.on('end', function ()2105 {2106 expect(len).to.equal(1024 * 1024);2107 expect(hash.digest('hex')).to.equal('268e1a23a9da868b62b12e020061c98449568c4af9cf9070c8738fe1b457ed9c');2108 cb(null, done);2109 });2110 }2111 handler.accept_stream = true;2112 fsq.subscribe('foo', handler);2113 fs.createReadStream(path.join(__dirname, 'fixtures', 'random')).pipe(2114 fsq.publish('foo', function (err)2115 {2116 if (err) { return done(err); }2117 }));2118 });2119 it('should use inotify to process messages straight away', function (done)2120 {2121 var fsq2 = make_fsq(2, 1, { poll_interval: 10 * 1000 }), time;2122 ignore_ebusy(fsq2);2123 fsq2.subscribe('foo', function ()2124 {2125 expect(new Date().getTime() - time).to.be.below(fsq2._poll_interval);2126 fsq2.stop_watching(done);2127 });2128 fsq2.on('start', function ()2129 {2130 time = new Date().getTime();2131 fsq2.publish('foo', 'bar');2132 });2133 });2134 it('should be able to disable inotify', function (done)2135 {2136 restore();2137 var fsq2 = make_fsq(2, 1,2138 {2139 poll_interval: 10 * 1000,2140 notify: false2141 }), time;2142 ignore_ebusy(fsq2);2143 fsq2.subscribe('foo', function ()2144 {2145 expect(new Date().getTime() - time).to.be.at.least(fsq2._poll_interval - 1000);2146 fsq2.stop_watching(done);2147 });2148 fsq2.on('start', function ()2149 {2150 // The countdown to the next poll has already started so from here2151 // it may not be poll_interval until the message is received -2152 // which is why we subtract a second above.2153 time = new Date().getTime();2154 fsq2.publish('foo', 'bar', { ttl: 30 * 1000 });2155 });2156 });2157 it('should be able to change the size of update stamps', function (done)2158 {2159 fs.stat(path.join(msg_dir, '..', 'update', 'UPDATE'), function (err, stats)2160 {2161 if (err) { return done(err); }2162 expect(stats.size).to.equal(Math.pow(16, 2) * 32);2163 fsq.stop_watching(function ()2164 {2165 var fsq2 = make_fsq(1, 0, { bucket_stamp_size: 64 });2166 ignore_ebusy(fsq2);2167 fsq2.subscribe('foo', function (data)2168 {2169 expect(data.toString('utf8')).to.equal('bar');2170 fsq2.stop_watching(done);2171 });2172 fsq2.on('start', function ()2173 {2174 fs.stat(path.join(msg_dir, '..', 'update', 'UPDATE'), function (err, stats)2175 {2176 if (err) { return done(err); }2177 expect(stats.size).to.equal(Math.pow(16, 2) * 64);2178 fsq2.publish('foo', 'bar');2179 });2180 });2181 });2182 });2183 });2184 it('should be able to disable the update file', function (done)2185 {2186 var update_file = path.join(msg_dir, '..', 'update', 'UPDATE');2187 fs.stat(update_file, function (err, stats)2188 {2189 if (err) { return done(err); }2190 expect(stats.size).to.equal(Math.pow(16, 2) * 32);2191 fsq.stop_watching(function ()2192 {2193 fs.unlink(update_file, function (err)2194 {2195 if (err) { return done(err); }2196 var fsq2 = make_fsq(1, 0, { bucket_stamp_size: 0 });2197 ignore_ebusy(fsq2);2198 fsq2.subscribe('foo', function (data)2199 {2200 expect(data.toString('utf8')).to.equal('bar');2201 fsq2.stop_watching(done);2202 });2203 fsq2.on('start', function ()2204 {2205 fs.stat(update_file, function (err)2206 {2207 expect(err.code).to.equal('ENOENT');2208 fsq2.publish('foo', 'bar');2209 });2210 });2211 });2212 });2213 });2214 });2215 it('should refresh from disk once every 10 seconds by default', function (done)2216 {2217 this.timeout(60000);2218 fsq.stop_watching(function ()2219 {2220 var fsq2 = make_fsq(1, 0);2221 ignore_ebusy(fsq2);2222 var count = 0;2223 process.nextTick(function ()2224 {2225 var orig_process_all = fsq2._process_all;2226 fsq2._process_all = function ()2227 {2228 count += 1;2229 return orig_process_all.apply(this, arguments);2230 };2231 });2232 fsq2.on('start', function ()2233 {2234 expect(count).to.equal(256);2235 setTimeout(function ()2236 {2237 expect(count).to.equal(512);2238 fsq2.stop_watching(done);2239 }, 11000);2240 });2241 });2242 });2243 it('should be able to change refresh from disk interval', function (done)2244 {2245 this.timeout(60000);2246 fsq.stop_watching(function ()2247 {2248 var fsq2 = make_fsq(1, 0, { refresh_ttl: 3000 });2249 ignore_ebusy(fsq2);2250 var count = 0;2251 process.nextTick(function ()2252 {2253 var orig_process_all = fsq2._process_all,2254 time = new Date().getTime();2255 fsq2._process_all = function ()2256 {2257 count += 1;2258 var r = orig_process_all.apply(this, arguments);2259 if (count === 1024)2260 {2261 var time2 = new Date().getTime();2262 expect(time2 - time).to.be.at.least(9000);2263 fsq2.stop_watching(done);2264 }2265 return r;2266 };2267 });2268 fsq2.on('start', function ()2269 {2270 expect(count).to.equal(256);2271 });2272 });2273 });2274 if (use_disruptor)2275 {2276 it('should be able to disable refresh from disk', function (done)2277 {2278 this.timeout(60000);2279 fsq.stop_watching(function ()2280 {2281 var fsq2 = make_fsq(1, 0, { refresh_ttl: 0 });2282 ignore_ebusy(fsq2);2283 var count = 0;2284 process.nextTick(function ()2285 {2286 var orig_process_all = fsq2._process_all;2287 fsq2._process_all = function ()2288 {2289 count += 1;2290 return orig_process_all.apply(this, arguments);2291 };2292 });2293 fsq2.on('start', function ()2294 {2295 expect(count).to.equal(0);2296 setTimeout(function ()2297 {2298 expect(count).to.equal(0);2299 fsq2.stop_watching(done);2300 }, 11000);2301 });2302 });2303 });2304 }2305 it('should be able to change the number of random bytes at the end of filenames', function (done)2306 {2307 fsq.subscribe('foo', function (data, info)2308 {2309 var split = info.fname.split('+'), fsq2;2310 expect(split[split.length - 1].length).to.equal(32);2311 fsq.stop_watching(function ()2312 {2313 fsq2 = make_fsq(1, 0, { unique_bytes: 8 });2314 ignore_ebusy(fsq2);2315 fsq2.subscribe('foo', function (data, info)2316 {2317 var split2 = info.fname.split('+');2318 expect(split2[split2.length - 1].length).to.equal(16);2319 fsq2.stop_watching(done);2320 });2321 fsq2.on('start', function ()2322 {2323 fsq2.publish('foo', 'bar');2324 });2325 });2326 });2327 fsq.publish('foo', 'bar');2328 });2329 it('should read one message at a time by default', function (done)2330 {2331 this.timeout(5 * 60 * 1000);2332 restore();2333 fsq.stop_watching(function ()2334 {2335 var in_call = false,2336 count = 0,2337 fsq2 = make_fsq(1, 0,2338 {2339 poll_interval: 10 * 1000,2340 notify: false2341 });2342 ignore_ebusy(fsq2);2343 function handler (stream, info, cb)2344 {2345 expect(in_call).to.equal(false);2346 in_call = true;2347 count += 1;2348 stream.on('end', function ()2349 {2350 in_call = false;2351 cb(null, count === 5 ? function ()2352 {2353 fsq2.stop_watching(done);2354 } : null);2355 });2356 if (count === 5)2357 {2358 stream.on('data', function () { return undefined; });2359 }2360 else2361 {2362 // give time for other reads to start2363 setTimeout(function ()2364 {2365 stream.on('data', function () { return undefined; });2366 }, 5 * 1000);2367 }2368 }2369 handler.accept_stream = true;2370 2371 fsq2.subscribe('foo', handler);2372 fsq2.on('start', function ()2373 {2374 function cb(err)2375 {2376 if (err) { done(err); }2377 }2378 for (var i = 0; i < 5; i += 1)2379 {2380 fsq2.publish('foo', 'bar', { ttl: 2 * 60 * 1000 }, cb);2381 }2382 });2383 });2384 });2385 it('should be able to read more than one message at a time', function (done)2386 {2387 this.timeout(5 * 60 * 1000);2388 restore();2389 fsq.stop_watching(function ()2390 {2391 var in_call = 0,2392 count = 0,2393 fsq2 = make_fsq(1, 0,2394 {2395 poll_interval: 10 * 1000,2396 notify: false,2397 bucket_base: 10,2398 bucket_num_chars: 2,2399 bucket_concurrency: 5,2400 message_concurrency: 22401 });2402 ignore_ebusy(fsq2);2403 function handler(stream, info, cb)2404 {2405 expect(in_call).to.be.at.most(9);2406 in_call += 1;2407 count += 1;2408 stream.on('end', function ()2409 {2410 in_call -= 1;2411 cb(null, (count === 25) && (in_call === 0) ? function ()2412 {2413 fsq2.stop_watching(done);2414 } : null);2415 });2416 if (count === 25)2417 {2418 stream.on('data', function () { return undefined; });2419 }2420 else2421 {2422 // give time for other reads to start2423 setTimeout(function ()2424 {2425 stream.on('data', function () { return undefined; });2426 }, 5 * 1000);2427 }2428 }2429 2430 handler.accept_stream = true;2431 fsq2.subscribe('foo', handler);2432 fsq2.on('start', function ()2433 {2434 var i;2435 function cb(err)2436 {2437 if (err) { done(err); }2438 }2439 for (i = 0; i < 25; i += 1)2440 {2441 fsq2.publish('foo', 'bar', { ttl: 2 * 60 * 1000 }, cb);2442 }2443 });2444 });2445 });2446 2447 it('should clear up expired messages', function (done)2448 {2449 var num_queues = 100,2450 num_messages = 500;2451 restore();2452 this.timeout(10 * 60 * 1000);2453 fsq.stop_watching(async function ()2454 {2455 const open_before = await lsof();2456 async.timesSeries(num_queues, function (n, cb)2457 {2458 var fsq = make_fsq(num_queues, n);2459 ignore_ebusy(fsq, os.platform() === 'win32' ? 'EPERM' : null);2460 fsq.on('start', function ()2461 {2462 cb(null, fsq);2463 });2464 }, function (err, fsqs)2465 {2466 if (err) { return done(err); }2467 expect(fsqs.length).to.equal(num_queues);2468 async.timesSeries(num_messages, function (n, cb)2469 {2470 fsqs[0].publish('foo', 'bar', { ttl: 2 * 1000 }, cb);2471 }, function (err)2472 {2473 if (err) { return done(err); }2474 setTimeout(function ()2475 {2476 async.each(fsqs, function (fsq, next)2477 {2478 fsq.subscribe('foo', function ()2479 {2480 throw new Error('should not be called');2481 });2482 fsq.force_refresh();2483 next();2484 }, function ()2485 {2486 setTimeout(function ()2487 {2488 async.each(fsqs, function (fsq, cb)2489 {2490 fsq.stop_watching(cb);2491 }, function ()2492 {2493 check_empty(msg_dir, done, async function ()2494 {2495 const open_after = await lsof();2496 try2497 {2498 expect(open_after[0]).to.eql(open_before[0]);2499 }2500 catch (ex)2501 {2502 console.error(open_before[1], open_after[1]);2503 throw ex;2504 }2505 done();2506 });2507 });2508 }, 60 * 1000);2509 });2510 }, 2 * 1000);2511 });2512 });2513 });2514 });2515 if (single_supported)2516 {2517 it('should clear up expired message while worker has it locked', function (done)2518 {2519 this.timeout(60 * 1000);2520 restore();2521 fsq.subscribe('foo', function (data, info, cb)2522 {2523 console.log(info);2524 setTimeout(function ()2525 {2526 if (process.platform === 'win32')2527 {2528 get_message_files(msg_dir, function (err, files)2529 {2530 if (err) { return done(err); }2531 expect(files.length).to.equal(1);2532 cb(null, done);2533 });2534 }2535 else2536 {2537 check_empty(msg_dir, done, function ()2538 {2539 cb(null, done);2540 });2541 }2542 }, 30 * 1000);2543 });2544 fsq.publish('foo', 'bar', { single: true, ttl: 5000 }, function (err)2545 {2546 if (err) { done(err); }2547 });2548 });2549 }2550 function bucket_names(base, chars)2551 {2552 var n = Math.pow(base, chars), i, s, r = [], arr = [];2553 for (i = 0; i < n; i += 1)2554 {2555 s = base > 1 ? i.toString(base) : '0';2556 arr.length = chars + 1;2557 r.push((arr.join('0') + s).slice(-chars));2558 }2559 return r;2560 }2561 function test_buckets(base, chars)2562 {2563 it('should distribute messages between bucket directories (base=' + base + ', chars=' + chars + ')', function (done)2564 {2565 // This _could_ fail if the hash function happens not to distribute2566 // at least one message into each bucket.2567 var timeout = 10 * 60 * 1000,2568 buckets = {},2569 count = 0,2570 num,2571 fsq2;2572 this.timeout(timeout);2573 function go()2574 {2575 num = Math.pow(base, chars) * 15;2576 fsq2.subscribe('foo', function (data, info)2577 {2578 var mdir = path.join(path.dirname(info.path), '..');2579 expect(mdir).to.equal(msg_dir);2580 buckets[path.basename(path.dirname(info.path))] = true;2581 count += 1;2582 if (count === num)2583 {2584 fs.readdir(mdir, function (err, files)2585 {2586 var names = bucket_names(base, chars);2587 if (err) { return done(err); }2588 expect(files.sort()).to.eql(names);2589 expect(Object.keys(buckets).sort()).to.eql(names);2590 fsq2.stop_watching(done);2591 });2592 }2593 else if (count > num)2594 {2595 throw new Error("called too many times");2596 }2597 }, function (err)2598 {2599 if (err) { done(err); }2600 async.timesLimit(num, 5, function (i, next)2601 {2602 fsq2.publish('foo', 'bar', { ttl: timeout }, function (err)2603 {2604 if (err) { done(err); }2605 next();2606 });2607 });2608 });2609 }2610 if (base === undefined)2611 {2612 base = 16;2613 chars = 2;2614 fsq2 = fsq;2615 go();2616 }2617 else2618 {2619 fsq.stop_watching(function ()2620 {2621 rimraf(fsq_dir, function (err)2622 {2623 if (err) { return done(err); }2624 fsq2 = make_fsq(1, 0,2625 {2626 bucket_base: base,2627 bucket_num_chars: chars,2628 });2629 ignore_ebusy(fsq2);2630 fsq2.on('start', go);2631 });2632 });2633 }2634 });2635 }2636 test_buckets();2637 test_buckets(1, 1);2638 test_buckets(10, 2);2639 test_buckets(26, 1);2640 test_buckets(26, 2);2641 test_buckets(8, 3);2642 it('should emit an error event if an error occurs before a start event', function (done)2643 {2644 var orig_mkdir = fs.mkdir, fsq2;2645 fs.mkdir = function (dir, cb)2646 {2647 cb('dummy error');2648 };2649 fsq2 = make_fsq(2, 1);2650 ignore_ebusy(fsq2);2651 fsq2.on('error', function (err)2652 {2653 expect(err).to.equal('dummy error');2654 fs.mkdir = orig_mkdir;2655 this.stop_watching(done);2656 });2657 });2658 it('should handle read errors', function (done)2659 {2660 var count = 0,2661 orig_createReadStream = fs.createReadStream;2662 fsq._fs.createReadStream = function ()2663 {2664 return orig_createReadStream.call(this, '');2665 };2666 fsq.on('warning', function (err)2667 {2668 if (err && (err.code === 'ENOENT'))2669 {2670 count += 1;2671 if (!single_supported)2672 {2673 fsq._fs.createReadStream = orig_createReadStream;2674 done();2675 }2676 else if (count === 5) // check single repeats2677 {2678 fsq._fs.createReadStream = orig_createReadStream;2679 }2680 }2681 });2682 fsq.subscribe('foo', function (data, info, cb)2683 {2684 if (!info.single)2685 {2686 expect(use_disruptor).to.equal(true);2687 if (!single_supported)2688 {2689 done();2690 }2691 return;2692 }2693 expect(info.single).to.equal(true); // we throw multi away on error2694 cb(null, done);2695 });2696 fsq.publish('foo', 'bar', function (err)2697 {2698 if (err) { done(err); }2699 });2700 fsq.publish('foo', 'bar', { single: true }, function (err)2701 {2702 if (err) { done(err); }2703 });2704 });2705 it('should pass back write errors when publishing', function (done)2706 {2707 var orig_createWriteStream = fsq._fs.createWriteStream;2708 fsq._fs.createWriteStream = function ()2709 {2710 return orig_createWriteStream.call(this, '');2711 };2712 fsq.publish('foo', 'bar', function (err)2713 {2714 expect(err.code).to.equal('ENOENT');2715 fsq._fs.createWriteStream = orig_createWriteStream;2716 done();2717 });2718 });2719 it('should close stream if error occurs when publishing', function (done)2720 {2721 var finished = false;2722 var s = fsq.publish('foo', function (err)2723 {2724 // wait for initial byte write to be done, which holds up prefinish2725 setTimeout(function ()2726 {2727 expect(err.message).to.equal('dummy');2728 expect(finished).to.equal(true);2729 done();2730 }, 500);2731 });2732 s.on('prefinish', function ()2733 {2734 finished = true;2735 });2736 s.emit('error', new Error('dummy'));2737 });2738 it('should close stream if error occurs when publishing after open', function (done)2739 {2740 var finished = false;2741 var s = fsq.publish('foo', function (err)2742 {2743 expect(err.message).to.be.oneOf([2744 'dummy',2745 // Node 12: Stream is destroyed with 'dummy' but the write to2746 // fs_stream is delayed until fs_stream is opened.2747 'Cannot call write after a stream was destroyed'2748 ]);2749 expect(finished).to.equal(true);2750 done();2751 });2752 s.on('prefinish', function ()2753 {2754 finished = true;2755 });2756 if (use_disruptor)2757 {2758 // Not a fs.WriteStream until content exceeds disruptor element size2759 s.write('a', function ()2760 {2761 s.emit('error', new Error('dummy'));2762 });2763 }2764 else2765 {2766 s.on('open', function ()2767 {2768 this.emit('error', new Error('dummy'));2769 });2770 }2771 });2772 it('should handle closed direct stream before given to subscribers', function (done)2773 {2774 var stream_mod = require('stream');2775 fsq.stop_watching(function ()2776 {2777 var fsq2 = make_fsq(1, 0, { direct_handler: new class2778 {2779 constructor()2780 {2781 this.streams = new Map();2782 this.gsfp_called = false;2783 this.gsfs_called = false;2784 this.psd_called = false;2785 this.ssd_called = false;2786 }2787 get_stream_for_publish(filename, direct)2788 {2789 this.gsfp_called = true;2790 expect(direct).to.equal('something truthy');2791 const r = new stream_mod.PassThrough();2792 this.streams.set(filename, r);2793 return r;2794 }2795 get_stream_for_subscribers(filename)2796 {2797 expect(this.gsfs_called).to.be.false;2798 this.gsfs_called = true;2799 expect(this.gsfp_called).to.be.true;2800 expect(this.psd_called).to.be.true;2801 expect(this.ssd_called).to.be.false;2802 const r = this.streams.get(filename);2803 expect(r).to.be.undefined;2804 fsq2.stop_watching(done);2805 return r;2806 }2807 publish_stream_destroyed(filename, stream)2808 {2809 this.psd_called = true;2810 const s = this.streams.get(filename);2811 expect(s).to.equal(stream);2812 this.streams.delete(filename);2813 }2814 subscriber_stream_destroyed(unused_filename, unused_stream)2815 {2816 this.ssd_called = true;2817 }2818 }()});2819 fsq2.on('start', function ()2820 {2821 var pub_called = false,2822 stream_direct;2823 function handler(unused_stream, unused_info, unused_cb)2824 {2825 done(new Error('should not be called'));2826 }2827 handler.accept_stream = true;2828 fsq2.subscribe('foo', handler);2829 2830 stream_direct = fsq2.publish('foo', { direct: 'something truthy' }, function (err)2831 {2832 if (err) { return done(err); }2833 expect(pub_called).to.equal(false);2834 pub_called = true;2835 });2836 stream_direct.destroy();2837 });2838 });2839 });2840 it('should handle errored direct stream before given to subscribers', function (done)2841 {2842 var stream_mod = require('stream'),2843 warning_called = false;2844 fsq.stop_watching(function ()2845 {2846 var fsq2 = make_fsq(1, 0, { direct_handler: new class2847 {2848 constructor()2849 {2850 this.streams = new Map();2851 this.gsfp_called = false;2852 this.gsfs_called = false;2853 this.psd_called = false;2854 this.ssd_called = false;2855 }2856 get_stream_for_publish(filename, direct)2857 {2858 this.gsfp_called = true;2859 expect(direct).to.equal('something truthy');2860 const r = new stream_mod.PassThrough();2861 this.streams.set(filename, r);2862 return r;2863 }2864 get_stream_for_subscribers(filename)2865 {2866 expect(this.gsfs_called).to.be.false;2867 this.gsfs_called = true;2868 expect(warning_called).to.be.true;2869 expect(this.gsfp_called).to.be.true;2870 expect(this.psd_called).to.be.true;2871 expect(this.ssd_called).to.be.false;2872 expect(this.streams.has(filename)).to.be.false;2873 fsq2.stop_watching(done);2874 return null;2875 }2876 publish_stream_destroyed(filename, stream)2877 {2878 this.psd_called = true;2879 const s = this.streams.get(filename);2880 expect(s).to.equal(stream);2881 this.streams.delete(filename);2882 }2883 subscriber_stream_destroyed(unused_filename, unused_stream)2884 {2885 this.ssd_called = true;2886 }2887 }()});2888 fsq2.on('start', function ()2889 {2890 var pub_called = false,2891 stream_direct;2892 function handler(unused_stream, unused_info, unused_cb)2893 {2894 done(new Error('should not be called'));2895 }2896 handler.accept_stream = true;2897 fsq2.subscribe('foo', handler);2898 fsq2.on('warning', function (err)2899 {2900 warning_called = true;2901 expect(err.message).to.equal('some error');2902 expect(this._direct_handler.gsfp_called).to.be.true;2903 expect(this._direct_handler.gsfs_called).to.be.false;2904 expect(this._direct_handler.psd_called).to.be.false;2905 expect(this._direct_handler.ssd_called).to.be.false;2906 });2907 2908 stream_direct = fsq2.publish('foo', { direct: 'something truthy' }, function (err)2909 {2910 if (err) { return done(err); }2911 expect(pub_called).to.equal(false);2912 pub_called = true;2913 });2914 stream_direct.destroy(new Error('some error'));2915 });2916 });2917 });2918 it('should support disabling work queue (single messages)', function (done)2919 {2920 fsq.stop_watching(function ()2921 {2922 var fsq2 = make_fsq(1, 0, { single: false }), called = false;2923 ignore_ebusy(fsq2);2924 fsq2.subscribe('foo', function (data, info, unused_cb)2925 {2926 expect(info.topic).to.equal('foo');2927 expect(data.toString('utf8')).to.equal('bar');2928 // should never be called with 'single' messages2929 expect(info.single).to.equal(false);2930 called = true;2931 });2932 fsq2.on('start', function ()2933 {2934 fsq2.publish('foo', 'bar', function (err)2935 {2936 if (err) { done(err); }2937 });2938 fsq2.publish('foo', 'bar', { single: true }, function (err)2939 {2940 if (err) { done(err); }2941 });2942 });2943 setTimeout(function ()2944 {2945 expect(called).to.equal(true);2946 fsq2.stop_watching(done);2947 }, 10000);2948 });2949 });2950 it('should disable work queue (single messages) if fs-ext is not available', function (done)2951 {2952 fsq.stop_watching(function ()2953 {2954 var fsq2 = make_fsq(1, 0);2955 fsq2._require_fs = function (fs)2956 {2957 if (fs !== 'fs')2958 {2959 throw new Error('dummy');2960 }2961 return require(fs);2962 };2963 fsq2.on('start', function ()2964 {2965 expect(this._do_single).to.equal(false);2966 fsq2.stop_watching(done);2967 });2968 });2969 });2970 it('should emit event when fs-ext is not available', function (done)2971 {2972 fsq.stop_watching(function ()2973 {2974 var fsq2 = make_fsq(1, 0);2975 fsq2._require_fs = function (fs)2976 {2977 if (fs !== 'fs')2978 {2979 throw new Error('dummy');2980 }2981 return require(fs);2982 };2983 fsq2.on('single_disabled', function (err)2984 {2985 expect(this._do_single).to.equal(false);2986 expect(err.message).to.equal('dummy');2987 fsq2.stop_watching(done);2988 });2989 });2990 });2991 it('should emit event when getdents is not available', function (done)2992 {2993 fsq.stop_watching(function ()2994 {2995 var fsq2 = make_fsq(1, 0, { getdents_size: 1024 });2996 fsq2._require_getdents = function ()2997 {2998 throw new Error('dummy');2999 };3000 fsq2.on('getdents_disabled', function (err)3001 {3002 expect(err.message).to.equal('dummy');3003 fsq2.stop_watching(done);3004 });3005 });3006 });3007 it('should publish to a topic with an invalid file name character', function (done)3008 {3009 var arr = [], ltopic, rsingle = !single_supported, rmulti = false;3010 arr.length = 64 * 1024 + 1;3011 ltopic = arr.join('\0');3012 fsq.subscribe('*', function (data, info, cb)3013 {3014 if (info.single)3015 {3016 expect(rsingle).to.equal(false);3017 expect(info.topic).to.equal(ltopic);3018 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);3019 expect(info.fname.lastIndexOf(Buffer.from(ltopic).toString('hex').substr(0, fsq._split_topic_at) + '@', 0)).to.equal(0);3020 expect(data.toString('utf8')).to.equal('test');3021 var topic_dir = path.dirname(path.dirname(info.topic_path));3022 expect(topic_dir).to.equal(path.join(msg_dir, '..', 'topics'));3023 expect(fs.readFileSync(info.topic_path).toString('utf8')).to.equal(Buffer.from(ltopic).toString('hex').substr(fsq._split_topic_at));3024 }3025 else3026 {3027 expect(rmulti).to.equal(false);3028 expect(info.topic).to.equal('\0foo');3029 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);3030 expect(info.fname.lastIndexOf(Buffer.from('\0foo').toString('hex') + '@', 0)).to.equal(0);3031 expect(info.topic_path).to.equal(undefined);3032 expect(data.toString('utf8')).to.equal('bar');3033 }3034 cb(null, function ()3035 {3036 if (info.single)3037 {3038 rsingle = true;3039 }3040 else3041 {3042 rmulti = true;3043 }3044 if (rsingle && rmulti)3045 {3046 fsq.stop_watching(done);3047 }3048 });3049 });3050 fsq.publish('\0foo', 'bar', function (err)3051 {3052 if (err) { done(err); }3053 });3054 fsq.publish(ltopic, 'test', { single: true }, function (err)3055 {3056 if (err) { done(err); }3057 });3058 });3059 it('should error when publishing to a topic with an invalid file name character and topic encoding is disabled', function (done)3060 {3061 fsq.stop_watching(function ()3062 {3063 var fsq2 = make_fsq(1, 0, { encode_topics: false });3064 ignore_ebusy(fsq2);3065 fsq2.on('start', function ()3066 {3067 this.publish('\0foo', 'bar', function (err)3068 {3069 if (ephemeral)3070 {3071 if (err) { return done(err); }3072 }3073 else3074 {3075 if (!err) { return done(new Error('expected an error')); }3076 if (err.code) // 0.12 doesn't set code3077 {3078 expect(err.code).to.be.oneOf(['ENOENT', 'ERR_INVALID_ARG_TYPE', 'ERR_INVALID_ARG_VALUE']);3079 }3080 }3081 this.stop_watching(done);3082 });3083 });3084 });3085 });3086 it('should not error when publishing to a topic without an invalid file name character and topic encoding is disabled', function (done)3087 {3088 fsq.stop_watching(function ()3089 {3090 var fsq2 = make_fsq(1, 0, { encode_topics: false });3091 ignore_ebusy(fsq2);3092 fsq2.on('start', function ()3093 {3094 var arr = [], ltopic, rsingle = !single_supported, rmulti = false;3095 arr.length = 64 * 1024 + 1;3096 ltopic = arr.join('a');3097 this.subscribe('*', function (data, info, cb)3098 {3099 if (info.single)3100 {3101 expect(rsingle).to.equal(false);3102 expect(info.topic).to.equal(ltopic);3103 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);3104 expect(info.fname.lastIndexOf(ltopic.substr(0, fsq._split_topic_at) + '@', 0)).to.equal(0);3105 expect(data.toString('utf8')).to.equal('test');3106 var topic_dir = path.dirname(path.dirname(info.topic_path));3107 expect(topic_dir).to.equal(path.join(msg_dir, '..', 'topics'));3108 expect(fs.readFileSync(info.topic_path).toString('utf8')).to.equal(ltopic.substr(fsq._split_topic_at));3109 }3110 else3111 {3112 expect(rmulti).to.equal(false);3113 expect(info.topic).to.equal('foo');3114 expect(info.path.lastIndexOf(msg_dir, 0)).to.equal(0);3115 expect(info.fname.lastIndexOf('foo' + '@', 0)).to.equal(0);3116 expect(info.topic_path).to.equal(undefined);3117 expect(data.toString('utf8')).to.equal('bar');3118 }3119 cb(null, function ()3120 {3121 if (info.single)3122 {3123 rsingle = true;3124 }3125 else3126 {3127 rmulti = true;3128 }3129 if (rsingle && rmulti)3130 {3131 fsq2.stop_watching(done);3132 }3133 });3134 });3135 this.publish('foo', 'bar', function (err)3136 {3137 if (err) { done(err); }3138 });3139 this.publish(ltopic, 'test', { single: true }, function (err)3140 {3141 if (err) { done(err); }3142 });3143 });3144 });3145 });3146 it('should support handler concurrency', function (done)3147 {3148 fsq.stop_watching(function ()3149 {3150 var fsq2 = make_fsq(1, 0, { handler_concurrency: 2 });3151 ignore_ebusy(fsq2);3152 fsq2.on('start', function ()3153 {3154 var streams = [];3155 function handler(s, unused_info)3156 {3157 streams.push(s);3158 if (streams.length === 1)3159 {3160 fsq2.publish('foo', 'bar2', function (err)3161 {3162 if (err) { done(err); }3163 });3164 }3165 else if (streams.length === 2)3166 {3167 read_all(streams[0], function (v)3168 {3169 expect(v.toString()).to.equal('bar');3170 read_all(streams[1], function (v)3171 {3172 expect(v.toString()).to.equal('bar2');3173 fsq2.stop_watching(done);3174 });3175 });3176 }3177 else3178 {3179 done(new Error('called too many times'));3180 }3181 }3182 handler.accept_stream = true;3183 this.subscribe('foo', handler);3184 this.publish('foo', 'bar', function (err)3185 {3186 if (err) { done(err); }3187 });3188 });3189 });3190 });3191 it('should support delivering messages in expiry order', function (done)3192 {3193 restore();3194 fsq.stop_watching(function ()3195 {3196 var fsq2 = make_fsq(1, 0,3197 {3198 poll_interval: 60 * 60 * 1000,3199 notify: false,3200 order_by_expiry: true,3201 multi_ttl: 24 * 60 * 60 * 10003202 });3203 expect(fsq2._bucket_base).to.equal(1);3204 expect(fsq2._bucket_num_chars).to.equal(1);3205 ignore_ebusy(fsq2);3206 fsq2.on('start', function ()3207 {3208 var n = 1000, ttls_out = [], ttls_in = [], expiries_in = [], i;3209 // Need to leave enough time between ttls to account for3210 // time increasing while publishing3211 for (i = 0; i < n; i += 1)3212 {3213 ttls_out.push(Math.round(3214 Math.random() * 18 * 60 // random mins up to 18 hour period3215 + 1 * 60) // plus one hour3216 3217 * 60 * 1000); // convert to milliseconds3218 }3219 function num_sort(x, y)3220 {3221 return x - y;3222 }3223 fsq2.subscribe('foo', function (data, info)3224 {3225 ttls_in.push(parseInt(data.toString()));3226 expiries_in.push(info.expires);3227 3228 if (ttls_in.length === n)3229 {3230 // check expiries are actually in ascending order3231 var sorted_expiries_in = expiries_in.concat();3232 sorted_expiries_in.sort(num_sort);3233 expect(expiries_in).to.eql(sorted_expiries_in);3234 // check messages are in expected order3235 ttls_out.sort(num_sort);3236 expect(ttls_in).to.eql(ttls_out);3237 this.stop_watching(done);3238 }3239 else if (ttls_in.length > n)3240 {3241 done(new Error('called too many times'));3242 }3243 });3244 async.eachSeries(ttls_out, function (ttl, cb)3245 {3246 fsq2.publish('foo', ttl.toString(), { ttl: ttl }, cb);3247 }, function (err)3248 {3249 expect(err).to.equal(null);3250 expect(ttls_in.length).to.equal(0);3251 fsq2.refresh_now();3252 });3253 });3254 });3255 });3256 it('should support error on stream and calling back', function (done)3257 {3258 var msg;3259 fsq.on('warning', function (err)3260 {3261 if (err.code !== 'EBUSY')3262 {3263 msg = err.message;3264 }3265 });3266 var handler = function (stream, info, cb)3267 {3268 stream.emit('error', new Error('dummy'));3269 cb(new Error('dummy'), function (err)3270 {3271 expect(err).to.equal(null);3272 expect(msg).to.equal('dummy');3273 done();3274 });3275 };3276 handler.accept_stream = true;3277 fsq.subscribe('foo', handler);3278 fsq.publish('foo', { single: single_supported }).end('bar');3279 });3280 it('should support calling back before stream has ended', function (done)3281 {3282 var count = 0;3283 function handler(stream, info, cb)3284 {3285 count += 1;3286 if (count === 2)3287 {3288 cb(null, done);3289 }3290 else3291 {3292 cb();3293 }3294 }3295 handler.accept_stream = true;3296 fsq.subscribe('foo', handler);3297 fsq.publish('foo').end('bar');3298 fsq.publish('foo', { single: single_supported }).end('bar');3299 });3300 it('should end/error stream after called back before stream has ended',3301 function (done)3302 {3303 var count = 0;3304 function handler(stream, info, cb)3305 {3306 stream.on('readable', function ()3307 {3308 var data = this.read();3309 if (use_disruptor && !info.single)3310 {3311 expect(data.toString()).to.equal('bar');3312 }3313 else3314 {3315 expect(data).to.equal(null);3316 }3317 });3318 count += 1;3319 if (count === 2)3320 {3321 stream.on('end', done);3322 cb();3323 }3324 else3325 {3326 var ended = false, msg;3327 stream.on('end', function ()3328 {3329 ended = true;3330 });3331 stream.on('error', function (err)3332 {3333 msg = err.message;3334 });3335 cb(new Error('dummy'), function (err)3336 {3337 expect(err).not.to.exist;3338 expect(msg).to.equal('dummy');3339 setImmediate(function ()3340 {3341 expect(ended).to.equal(true);3342 fsq.publish('foo', { single: single_supported }).end('bar');3343 });3344 });3345 }3346 }3347 handler.accept_stream = true;3348 fsq.subscribe('foo', handler);3349 fsq.publish('foo').end('bar');3350 });3351 function existing_messages(dedup)3352 {3353 describe('dedup=' + dedup, function ()3354 {3355 it('should support delivering existing messages to subscribers',3356 function(done)3357 {3358 restore();3359 fsq.stop_watching(function ()3360 {3361 var fsq2 = make_fsq(1, 0,3362 {3363 ttl: 10000,3364 dedup: dedup3365 });3366 fsq2.on('start', function ()3367 {3368 fsq2.subscribe('foo', function (data, info)3369 {3370 expect(info.existing).to.equal(undefined);3371 expect(data.toString()).to.equal('bar');3372 expect(info.topic).to.equal('foo');3373 setTimeout(function ()3374 {3375 fsq2.subscribe('foo', function (data2, info2)3376 {3377 expect(info.existing).to.equal(undefined);3378 expect(info2.existing).to.equal(true);3379 expect(data2.toString()).to.equal('bar');3380 expect(info2.topic).to.equal('foo');3381 expect(info2.path).to.equal(info.path);3382 this.stop_watching(done);3383 },3384 {3385 subscribe_to_existing: true3386 });3387 }, 500);3388 });3389 fsq2.publish('foo').end('bar');3390 });3391 });3392 });3393 it('should support delaying existing messages with filter',3394 function (done)3395 {3396 restore();3397 fsq.stop_watching(function ()3398 {3399 var count = 0;3400 function filter(info, handlers, cb)3401 {3402 count += 1;3403 if (count === 1)3404 {3405 // allow through initial pub and sub3406 cb(null, true, handlers);3407 }3408 else if (count === 2)3409 {3410 // delay first existing3411 cb(null, false);3412 }3413 else if (count === 3)3414 {3415 // subscribe again to existing messages3416 fsq2.subscribe('foo', function (data, info)3417 {3418 expect(info.existing).to.equal(true);3419 expect(data.toString()).to.equal('bar');3420 expect(info.topic).to.equal('foo');3421 },3422 {3423 subscribe_to_existing: true3424 });3425 // and delay again3426 cb(null, false);3427 }3428 else3429 {3430 // allow through existing3431 cb(null, true, handlers);3432 }3433 }3434 var fsq2 = make_fsq(1, 0,3435 {3436 ttl: 10000,3437 dedup: dedup,3438 filter: filter3439 });3440 fsq2.on('start', function ()3441 {3442 fsq2.subscribe('foo', function (data, info)3443 {3444 expect(info.existing).to.equal(undefined);3445 expect(data.toString()).to.equal('bar');3446 expect(info.topic).to.equal('foo');3447 setTimeout(function ()3448 {3449 fsq2.subscribe('foo', function (data2, info2)3450 {3451 expect(info.existing).to.equal(undefined);3452 expect(info2.existing).to.equal(true);3453 expect(data2.toString()).to.equal('bar');3454 expect(info2.topic).to.equal('foo');3455 expect(info2.path).to.equal(info.path);3456 this.stop_watching(done);3457 },3458 {3459 subscribe_to_existing: true3460 });3461 }, 500);3462 });3463 fsq2.publish('foo').end('bar');3464 });3465 });3466 });3467 it('should support delaying existing messages twice',3468 function (done)3469 {3470 restore();3471 fsq.stop_watching(function ()3472 {3473 var count = 0;3474 function filter(info, handlers, cb)3475 {3476 count += 1;3477 if (count === 1)3478 {3479 // allow through initial pub and sub3480 cb(null, true, handlers);3481 }3482 else if (count === 2)3483 {3484 // delay first existing3485 cb(null, false);3486 }3487 else if (count === 3)3488 {3489 // subscribe again to existing messages3490 fsq2.subscribe('foo', function (data, info)3491 {3492 expect(info.existing).to.equal(true);3493 expect(data.toString()).to.equal('bar');3494 expect(info.topic).to.equal('foo');3495 },3496 {3497 subscribe_to_existing: true3498 });3499 // for old method which checks if has unsubscribed3500 fsq2.unsubscribe('dummy');3501 // and delay again3502 cb(null, false);3503 }3504 else if (count === 4)3505 {3506 // and delay again3507 cb(null, false);3508 }3509 else3510 {3511 // allow through existing3512 cb(null, true, handlers);3513 }3514 }3515 var fsq2 = make_fsq(1, 0,3516 {3517 ttl: 10000,3518 dedup: dedup,3519 filter: filter3520 });3521 fsq2.on('start', function ()3522 {3523 fsq2.subscribe('foo', function (data, info)3524 {3525 expect(info.existing).to.equal(undefined);3526 expect(data.toString()).to.equal('bar');3527 expect(info.topic).to.equal('foo');3528 setTimeout(function ()3529 {3530 fsq2.subscribe('foo', function (data2, info2)3531 {3532 expect(info.existing).to.equal(undefined);3533 expect(info2.existing).to.equal(true);3534 expect(data2.toString()).to.equal('bar');3535 expect(info2.topic).to.equal('foo');3536 expect(info2.path).to.equal(info.path);3537 this.stop_watching(done);3538 },3539 {3540 subscribe_to_existing: true3541 });3542 }, 500);3543 });3544 fsq2.publish('foo').end('bar');3545 });3546 });3547 });3548 it('should subscribe to new messages too',3549 function (done)3550 {3551 restore();3552 fsq.stop_watching(function ()3553 {3554 var fsq2 = make_fsq(1, 0,3555 {3556 ttl: 10000,3557 dedup: dedup3558 });3559 fsq2.on('start', function ()3560 {3561 fsq2.subscribe('foo', function (data, info)3562 {3563 expect(info.existing).to.equal(undefined);3564 expect(data.toString()).to.equal('bar');3565 expect(info.topic).to.equal('foo');3566 this.stop_watching(done);3567 },3568 {3569 subscribe_to_existing: true3570 });3571 // When using Disruptor, there is no cache so we don't3572 // know if we've seen a message. So we regard all as3573 // existing until we've done a poll. Thus, start a3574 // refresh (poll) now and then wait a bit for it to3575 // finish.3576 fsq2.refresh_now();3577 setTimeout(function ()3578 {3579 fsq2.publish('foo').end('bar');3580 }, 500);3581 });3582 });3583 });3584 it('should support unsubscribing from existing messages',3585 function(done)3586 {3587 restore();3588 fsq.stop_watching(function ()3589 {3590 var fsq2 = make_fsq(1, 0,3591 {3592 ttl: 10000,3593 dedup: dedup3594 });3595 fsq2.on('start', function ()3596 {3597 fsq2.subscribe('foo', function (data, info)3598 {3599 expect(info.existing).to.equal(undefined);3600 expect(data.toString()).to.equal('bar');3601 expect(info.topic).to.equal('foo');3602 setTimeout(function ()3603 {3604 function handler()3605 {3606 done(new Error('should not be called'));3607 }3608 fsq2.subscribe('foo', handler,3609 {3610 subscribe_to_existing: true3611 });3612 fsq2.unsubscribe('foo', handler);3613 setTimeout(function ()3614 {3615 fsq2.stop_watching(done);3616 }, 1500);3617 }, 500);3618 });3619 fsq2.publish('foo').end('bar');3620 });3621 });3622 });3623 function unsub_delayed_existing(unsub, done)3624 {3625 restore();3626 fsq.stop_watching(function ()3627 {3628 var count = 0;3629 function handler()3630 {3631 done(new Error('should not be called'));3632 }3633 function handler2()3634 {3635 done(new Error('should not be called'));3636 }3637 function filter(info, handlers, cb)3638 {3639 count += 1;3640 if (count === 1)3641 {3642 // allow through initial pub and sub3643 cb(null, true, handlers);3644 }3645 else if (count === 2)3646 {3647 // delay first existing3648 cb(null, false);3649 }3650 else if (count === 3)3651 {3652 // subscribe again to existing messages3653 fsq2.subscribe('foo', handler2,3654 {3655 subscribe_to_existing: true3656 });3657 // unsubscribe3658 unsub(fsq2, handler, handler2);3659 // and delay again3660 cb(null, false);3661 setTimeout(function ()3662 {3663 fsq2.stop_watching(done);3664 }, 1500);3665 }3666 else3667 {3668 // existing message doesn't call filter if there3669 // are no handlers3670 done(new Error('called too many times'));3671 }3672 }3673 var fsq2 = make_fsq(1, 0,3674 {3675 ttl: 10000,3676 dedup: dedup,3677 filter: filter...

Full Screen

Full Screen

annict.ts

Source:annict.ts Github

copy

Full Screen

1import got from "got";2import z from "zod";3import { redis } from "../../redis";4import { AnimeStatus, UserAnimeStatus, WatchStatus } from "../../type";5import { isNotNull } from "../../utils/is-not-null";6const zAnnictWatchStatus = z7 .literal("WANNA_WATCH")8 .or(z.literal("WATCHING"))9 .or(z.literal("WATCHED"))10 .or(z.literal("ON_HOLD"))11 .or(z.literal("STOP_WATCHING"));12type AnnictWatchStatus = z.infer<typeof zAnnictWatchStatus>;13const annictToShared: Record<AnnictWatchStatus, WatchStatus> = {14 WANNA_WATCH: "WANT",15 WATCHING: "WATCHING",16 WATCHED: "WATCHED",17 ON_HOLD: "PAUSED",18 STOP_WATCHING: "DROPPED",19};20const ANNICT_WATCH_STATUS = Object.keys(annictToShared) as AnnictWatchStatus[];21const zAnnictWorks = z.object({22 nodes: z.array(23 z.object({24 annictId: z.number(),25 malAnimeId: z.nullable(z.string()),26 }),27 ),28});29function generateZAnnictUserWorks() {30 const obj: Partial<Record<AnnictWatchStatus, undefined | typeof zAnnictWorks>> = {};31 for (const status of ANNICT_WATCH_STATUS) {32 obj[status] = zAnnictWorks;33 }34 return obj as Record<AnnictWatchStatus, typeof zAnnictWorks>;35}36const prefix = "sanime:watchlist:annict:v1:";37const zAnnictUser = z.object({38 username: z.string(),39 avatarUrl: z.string(),40 ...generateZAnnictUserWorks(),41});42async function fetchAnnictWatchesRaw(users: string[]): Promise<z.infer<typeof zAnnictUser>[]> {43 if (users.length === 0) return [];44 let query = `query (${users.map((_, i) => `$u${i}: String!`).join(", ")}) {\n`;45 for (const i of users.keys()) {46 query += `u${i}: user(username: $u${i}) { ...userObj }\n`;47 }48 query += "}\n";49 query += "fragment userObj on User {\n";50 query += "username\n";51 query += "avatarUrl\n";52 for (const state of ANNICT_WATCH_STATUS) {53 query += `${state}: works(state: ${state}) { nodes { ...workIds } }\n`;54 }55 query += "}\n";56 query += "fragment workIds on Work {\nannictId\nmalAnimeId\n}";57 console.log(query);58 const annictRes = await got.post<{ data: unknown }>("https://api.annict.com/graphql", {59 body: JSON.stringify({60 query,61 variables: users.reduce<Record<string, string>>((obj, user, i) => {62 obj[`u${i}`] = user;63 return obj;64 }, {}),65 }),66 headers: {67 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion68 "Authorization": `Bearer ${process.env.ANNICT_TOKEN!}`,69 "Content-Type": "application/json",70 },71 responseType: "json",72 });73 const data = z.record(z.string(), zAnnictUser).parse(annictRes.body.data);74 return Object.values(data);75}76export async function fetchAnnictWatches(users: string[]): Promise<UserAnimeStatus[]> {77 if (users.length === 0) return [];78 const cached = (await redis.mget(users.map(user => prefix + user.toLowerCase())))79 .filter(isNotNull)80 .map(e => JSON.parse(e) as z.infer<typeof zAnnictUser>);81 const cachedUser = new Set(cached.map(e => e.username.toLowerCase()));82 const fetchedUsers = await fetchAnnictWatchesRaw(users.filter(u => !cachedUser.has(u)));83 // 3分キャッシュ84 for (const fetchedUser of fetchedUsers) {85 await redis.set(86 prefix + fetchedUser.username.toLowerCase(),87 JSON.stringify(fetchedUser),88 "EX",89 60 * 3,90 );91 }92 return [...cached, ...fetchedUsers].map(user => {93 return {94 id: `annict:${user.username}`,95 avatarUrl: user.avatarUrl,96 works: ANNICT_WATCH_STATUS.reduce<AnimeStatus[]>(97 (works, key) => [98 ...works,99 ...user[key].nodes.map(work => {100 const myAnimeListID = work.malAnimeId == null ? undefined : parseInt(work.malAnimeId, 10);101 return {102 sourceServiceID: `annict:${work.annictId}` as const,103 myAnimeListID,104 status: annictToShared[key],105 };106 }),107 ],108 [],109 ),110 };111 });...

Full Screen

Full Screen

input.js

Source:input.js Github

copy

Full Screen

1import { describe, it } from 'mocha';2import { expect } from 'chai';3import UserInput from '../../src/lib/input';4describe('Respond message is depends on received message', () => {5 describe('/help', () => {6 it("'/help' message is received", () => {7 const input = new UserInput('/help');8 expect(input.isHelp()).to.equal(true);9 });10 it("'random/help' message is received", () => {11 const input = new UserInput('random/help');12 expect(input.isHelp()).to.equal(false);13 });14 });15 describe('/link', () => {16 it("'/link' message is received", () => {17 const input = new UserInput('/link');18 expect(input.isLink()).to.equal(true);19 });20 it("'random/link' message is received", () => {21 const input = new UserInput('random/link');22 expect(input.isLink()).to.equal(false);23 });24 });25 describe('/start_watching', () => {26 it("'/start_watching' message is received", () => {27 const input = new UserInput('/start_watching');28 expect(input.isStart()).to.equal(true);29 });30 it("'random/start_watching' message is received", () => {31 const input = new UserInput('random/start_watching');32 expect(input.isStart()).to.equal(false);33 });34 });35 describe('/stop_watching', () => {36 it("'/stop_watching' message is received", () => {37 const input = new UserInput('/stop_watching');38 expect(input.isStop()).to.equal(true);39 });40 it("'random/stop' message is received", () => {41 const input = new UserInput('random/stop');42 expect(input.isStop()).to.equal(false);43 });44 });45 describe('TravisCI link', () => {46 it('valid link is received', () => {47 const input = new UserInput('https://travis-ci.org/hello/world');48 expect(input.isValidLink()).to.equal(true);49 });50 it('link without https is not valid', () => {51 const input = new UserInput('travis-ci.org/hello/world');52 expect(input.isValidLink()).to.equal(false);53 });54 });...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1var wpt = require('webpagetest');2var wpt = new WebPageTest('www.webpagetest.org');3wpt.stop_watching('1234567890', function(err, data) {4 if(err) console.log(err);5 else console.log(data);6});7var wpt = require('webpagetest');8var wpt = new WebPageTest('www.webpagetest.org');9wpt.get_locations(function(err, data) {10 if(err) console.log(err);11 else console.log(data);12});13var wpt = require('webpagetest');14var wpt = new WebPageTest('www.webpagetest.org');15wpt.get_location('Dulles_IE10', function(err, data) {16 if(err) console.log(err);17 else console.log(data);18});19var wpt = require('webpagetest');

Full Screen

Using AI Code Generation

copy

Full Screen

1var wpt = require('wpt-api');2var client = wpt('API_KEY');3client.stop_watching('TEST_ID', function(err, data) {4 if(err) return console.log(err);5 console.log(data);6});

Full Screen

Using AI Code Generation

copy

Full Screen

1var wpt = require("wpt");2wpt.stop_watching("test.js");3var wpt = require("wpt");4wpt.watch("test.js", function() {5 console.log("file changed");6});7var wpt = require("wpt");8wpt.get("test.js", function(err, data) {9 if(err) {10 console.log(err);11 } else {12 console.log(data);13 }14});15var wpt = require("wpt");16wpt.set("test.js", "var a = 10;", function(err) {17 if(err) {18 console.log(err);19 } else {20 console.log("file updated");21 }22});23var wpt = require("wpt");24wpt.exists("test.js", function(err, exists) {25 if(err) {26 console.log(err);27 } else {28 console.log("file exists: " + exists);29 }30});31var wpt = require("wpt");32wpt.remove("test.js", function(err) {33 if(err) {34 console.log(err);35 } else {36 console.log("file removed");37 }38});39var wpt = require("wpt");40wpt.mkdir("test", function(err) {41 if(err) {42 console.log(err);43 } else {44 console.log("directory created");

Full Screen

Using AI Code Generation

copy

Full Screen

1var wpt = require('wpt');2var watcher = wpt.watch('test.js', function (evt, name) {3 console.log('%s changed.', name);4});5setTimeout(function () {6 watcher.stop_watching();7}, 10000);8var wpt = require('wpt');9wpt.watch_file('test.js', function (evt, name) {10 console.log('%s changed.', name);11});12var wpt = require('wpt');13wpt.watch_tree('test', function (evt, name) {14 console.log('%s changed.', name);15});16var wpt = require('wpt');17wpt.unwatch_file('test.js');18var wpt = require('wpt');19wpt.unwatch_tree('test');20var wpt = require('wpt');21wpt.unwatch_all();22var wpt = require('wpt');23wpt.watchTree('test', function (evt, name) {24 console.log('%s changed.', name);25});26var wpt = require('wpt');27wpt.unwatchTree('test');28var wpt = require('wpt');29wpt.watchFile('test.js', function (evt, name) {30 console.log('%s changed.', name);31});32var wpt = require('wpt');

Full Screen

Using AI Code Generation

copy

Full Screen

1var wpt = require('webpagetest');2var wpt_server = wpt('www.webpagetest.org');3wpt_server.stop_watching('20121023_7E_2d2', function(err, data) {4 if(err) console.log(err);5 console.log(data);6});

Full Screen

Using AI Code Generation

copy

Full Screen

1var wpt = require('webpagetest');2var api = new wpt('A.8a7a5c5f5b7f2d2a1b7e1b9a5e7a5d0e');3var options = {4};5api.runTest(options, function(err, data) {6 if (err) return console.error(err);7 console.log('Test submitted. Polling for results.');8 api.stopWatching(data.data.testId, function(err, data) {9 if (err) return console.error(err);10 console.log('Test stopped.');11 });12});

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run wpt automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful