How to use AsyncMessageQueue method of PuppeteerSharp.Helpers.AsyncMessageQueue class

Best Puppeteer-sharp code snippet using PuppeteerSharp.Helpers.AsyncMessageQueue.AsyncMessageQueue

Run Puppeteer-sharp automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

AsyncMessageQueue.cs

Source: AsyncMessageQueue.cs Github

copy
1using System;
2using System.Collections.Generic;
3using System.Threading;
4using System.Threading.Tasks;
5using Microsoft.Extensions.Logging;
6using Microsoft.Extensions.Logging.Abstractions;
7using PuppeteerSharp.Messaging;
8
9namespace PuppeteerSharp.Helpers
10{
11    /// <summary>
12    /// Provides an async queue for responses for <see cref="CDPSession.SendAsync"/>, so that responses can be handled
13    /// async without risk callers causing a deadlock.
14    /// </summary>
15    internal class AsyncMessageQueue : IDisposable
16    {
17        private readonly List<MessageTask> _pendingTasks;
18        private readonly bool _enqueueAsyncMessages;
19        private readonly ILogger _logger;
20        private bool _disposed;
21
22        public AsyncMessageQueue(bool enqueueAsyncMessages, ILogger logger = null)
23        {
24            _enqueueAsyncMessages = enqueueAsyncMessages;
25            _logger = logger ?? NullLogger.Instance;
26            _pendingTasks = new List<MessageTask>();
27        }
28
29        public void Enqueue(MessageTask callback, ConnectionResponse obj)
30        {
31            if (_disposed)
32            {
33                throw new ObjectDisposedException(GetType().FullName);
34            }
35
36            if (!_enqueueAsyncMessages)
37            {
38                HandleAsyncMessage(callback, obj);
39                return;
40            }
41
42            // Keep a ref to this task until it completes. If it can't finish by the time we dispose this queue,
43            // then we'll find it and cancel it.
44            lock (_pendingTasks)
45            {
46                _pendingTasks.Add(callback);
47            }
48
49            var task = Task.Run(() => HandleAsyncMessage(callback, obj));
50
51            // Unhandled error handler
52            task.ContinueWith(
53                t =>
54                {
55                    _logger.LogError(t.Exception, "Failed to complete async handling of SendAsync for {callback}", callback.Method);
56                    callback.TaskWrapper.TrySetException(t.Exception!); // t.Exception is available since this runs only on faulted
57                },
58                CancellationToken.None,
59                TaskContinuationOptions.OnlyOnFaulted,
60                TaskScheduler.Default);
61
62            // Always remove from the queue when done, regardless of outcome.
63            task.ContinueWith(
64                _ =>
65                {
66                    lock (_pendingTasks)
67                    {
68                        _pendingTasks.Remove(callback);
69                    }
70                },
71                TaskScheduler.Default);
72        }
73
74        public void Dispose()
75        {
76            if (_disposed)
77            {
78                return;
79            }
80
81            // Ensure all tasks are finished since we're disposing now. Any pending tasks will be canceled.
82            MessageTask[] pendingTasks;
83            lock (_pendingTasks)
84            {
85                pendingTasks = _pendingTasks.ToArray();
86                _pendingTasks.Clear();
87            }
88
89            foreach (var pendingTask in pendingTasks)
90            {
91                pendingTask.TaskWrapper.TrySetCanceled();
92            }
93
94            _disposed = true;
95        }
96
97        private static void HandleAsyncMessage(MessageTask callback, ConnectionResponse obj)
98        {
99            if (obj.Error != null)
100            {
101                callback.TaskWrapper.TrySetException(new MessageException(callback, obj.Error));
102            }
103            else
104            {
105                callback.TaskWrapper.TrySetResult(obj.Result);
106            }
107        }
108    }
109}
110
Full Screen

Connection.cs

Source: Connection.cs Github

copy
1using System;
2using System.Collections.Concurrent;
3using System.Linq;
4using System.Threading;
5using System.Threading.Tasks;
6using Microsoft.Extensions.Logging;
7using Newtonsoft.Json;
8using Newtonsoft.Json.Linq;
9using PuppeteerSharp.Helpers;
10using PuppeteerSharp.Helpers.Json;
11using PuppeteerSharp.Messaging;
12using PuppeteerSharp.Transport;
13
14namespace PuppeteerSharp
15{
16    /// <summary>
17    /// A connection handles the communication with a Chromium browser
18    /// </summary>
19    public class Connection : IDisposable
20    {
21        private readonly ILogger _logger;
22        private readonly TaskQueue _callbackQueue = new TaskQueue();
23
24        private readonly ConcurrentDictionary<int, MessageTask> _callbacks;
25        private readonly ConcurrentDictionary<string, CDPSession> _sessions;
26        private readonly AsyncDictionaryHelper<string, CDPSession> _asyncSessions;
27        private int _lastId;
28
29        /// <summary>
30        /// Gets default web socket factory implementation.
31        /// </summary>
32        [Obsolete("Use " + nameof(WebSocketTransport) + "." + nameof(WebSocketTransport.DefaultWebSocketFactory) + " instead")]
33        public static readonly WebSocketFactory DefaultWebSocketFactory = WebSocketTransport.DefaultWebSocketFactory;
34
35        internal Connection(string url, int delay, bool enqueueAsyncMessages, IConnectionTransport transport, ILoggerFactory loggerFactory = null)
36        {
37            LoggerFactory = loggerFactory ?? new LoggerFactory();
38            Url = url;
39            Delay = delay;
40            Transport = transport;
41
42            _logger = LoggerFactory.CreateLogger<Connection>();
43
44            Transport.MessageReceived += Transport_MessageReceived;
45            Transport.Closed += Transport_Closed;
46            _callbacks = new ConcurrentDictionary<int, MessageTask>();
47            _sessions = new ConcurrentDictionary<string, CDPSession>();
48            MessageQueue = new AsyncMessageQueue(enqueueAsyncMessages, _logger);
49            _asyncSessions = new AsyncDictionaryHelper<string, CDPSession>(_sessions, "Session {0} not found");
50        }
51
52        /// <summary>
53        /// Occurs when the connection is closed.
54        /// </summary>
55        public event EventHandler Disconnected;
56
57        /// <summary>
58        /// Occurs when a message from chromium is received.
59        /// </summary>
60        public event EventHandler<MessageEventArgs> MessageReceived;
61
62        internal event EventHandler<SessionAttachedEventArgs> SessionAttached;
63
64        /// <summary>
65        /// Gets the WebSocket URL.
66        /// </summary>
67        /// <value>The URL.</value>
68        public string Url { get; }
69
70        /// <summary>
71        /// Gets the sleep time when a message is received.
72        /// </summary>
73        /// <value>The delay.</value>
74        public int Delay { get; }
75
76        /// <summary>
77        /// Gets the Connection transport.
78        /// </summary>
79        /// <value>Connection transport.</value>
80        public IConnectionTransport Transport { get; }
81
82        /// <summary>
83        /// Gets a value indicating whether this <see cref="Connection"/> is closed.
84        /// </summary>
85        /// <value><c>true</c> if is closed; otherwise, <c>false</c>.</value>
86        public bool IsClosed { get; internal set; }
87
88        /// <summary>
89        /// Connection close reason.
90        /// </summary>
91        public string CloseReason { get; private set; }
92
93        /// <summary>
94        /// Gets the logger factory.
95        /// </summary>
96        /// <value>The logger factory.</value>
97        public ILoggerFactory LoggerFactory { get; }
98
99        internal AsyncMessageQueue MessageQueue { get; }
100
101        internal int GetMessageID() => Interlocked.Increment(ref _lastId);
102
103        internal Task RawSendASync(int id, string method, object args, string sessionId = null)
104        {
105            var message = JsonConvert.SerializeObject(
106                new ConnectionRequest { Id = id, Method = method, Params = args, SessionId = sessionId },
107                JsonHelper.DefaultJsonSerializerSettings);
108            _logger.LogTrace("Send ► {Message}", message);
109
110            return Transport.SendAsync(message);
111        }
112
113        internal async Task<JObject> SendAsync(string method, object args = null, bool waitForCallback = true)
114        {
115            if (IsClosed)
116            {
117                throw new TargetClosedException($"Protocol error({method}): Target closed.", CloseReason);
118            }
119
120            var id = GetMessageID();
121
122            MessageTask callback = null;
123            if (waitForCallback)
124            {
125                callback = new MessageTask
126                {
127                    TaskWrapper = new TaskCompletionSource<JObject>(TaskCreationOptions.RunContinuationsAsynchronously),
128                    Method = method
129                };
130                _callbacks[id] = callback;
131            }
132
133            await RawSendASync(id, method, args).ConfigureAwait(false);
134            return waitForCallback ? await callback.TaskWrapper.Task.ConfigureAwait(false) : null;
135        }
136
137        internal async Task<T> SendAsync<T>(string method, object args = null)
138        {
139            var response = await SendAsync(method, args).ConfigureAwait(false);
140            return response.ToObject<T>(true);
141        }
142
143        internal async Task<CDPSession> CreateSessionAsync(TargetInfo targetInfo)
144        {
145            var sessionId = (await SendAsync<TargetAttachToTargetResponse>("Target.attachToTarget", new TargetAttachToTargetRequest
146            {
147                TargetId = targetInfo.TargetId,
148                Flatten = true
149            }).ConfigureAwait(false)).SessionId;
150            return await GetSessionAsync(sessionId).ConfigureAwait(false);
151        }
152
153        internal bool HasPendingCallbacks() => _callbacks.Count != 0;
154
155        internal void Close(string closeReason)
156        {
157            if (IsClosed)
158            {
159                return;
160            }
161
162            IsClosed = true;
163            CloseReason = closeReason;
164
165            Transport.StopReading();
166            Disconnected?.Invoke(this, new EventArgs());
167
168            foreach (var session in _sessions.Values.ToArray())
169            {
170                session.Close(closeReason);
171            }
172
173            _sessions.Clear();
174
175            foreach (var response in _callbacks.Values.ToArray())
176            {
177                response.TaskWrapper.TrySetException(new TargetClosedException(
178                    $"Protocol error({response.Method}): Target closed.",
179                    closeReason));
180            }
181
182            _callbacks.Clear();
183            MessageQueue.Dispose();
184        }
185
186        internal static Connection FromSession(CDPSession session) => session.Connection;
187
188        internal CDPSession GetSession(string sessionId) => _sessions.GetValueOrDefault(sessionId);
189
190        internal Task<CDPSession> GetSessionAsync(string sessionId) => _asyncSessions.GetItemAsync(sessionId);
191
192        private async void Transport_MessageReceived(object sender, MessageReceivedEventArgs e)
193            => await _callbackQueue.Enqueue(() => ProcessMessage(e)).ConfigureAwait(false);
194
195        private async Task ProcessMessage(MessageReceivedEventArgs e)
196        {
197            try
198            {
199                var response = e.Message;
200                ConnectionResponse obj = null;
201
202                if (response.Length > 0 && Delay > 0)
203                {
204                    await Task.Delay(Delay).ConfigureAwait(false);
205                }
206
207                try
208                {
209                    obj = JsonConvert.DeserializeObject<ConnectionResponse>(response, JsonHelper.DefaultJsonSerializerSettings);
210                }
211                catch (JsonException exc)
212                {
213                    _logger.LogError(exc, "Failed to deserialize response", response);
214                    return;
215                }
216
217                _logger.LogTrace("◀ Receive {Message}", response);
218                ProcessIncomingMessage(obj);
219            }
220            catch (Exception ex)
221            {
222                var message = $"Connection failed to process {e.Message}. {ex.Message}. {ex.StackTrace}";
223                _logger.LogError(ex, message);
224                Close(message);
225            }
226        }
227
228        private void ProcessIncomingMessage(ConnectionResponse obj)
229        {
230            var method = obj.Method;
231            var param = obj.Params?.ToObject<ConnectionResponseParams>();
232
233            if (method == "Target.attachedToTarget")
234            {
235                var sessionId = param.SessionId;
236                var session = new CDPSession(this, param.TargetInfo.Type, sessionId);
237                _asyncSessions.AddItem(sessionId, session);
238
239                SessionAttached?.Invoke(this, new SessionAttachedEventArgs { Session = session });
240
241                if (obj.SessionId != null && _sessions.TryGetValue(obj.SessionId, out var parentSession))
242                {
243                    parentSession.OnSessionAttached(session);
244                }
245            }
246            else if (method == "Target.detachedFromTarget")
247            {
248                var sessionId = param.SessionId;
249                if (_sessions.TryRemove(sessionId, out var session) && !session.IsClosed)
250                {
251                    session.Close("Target.detachedFromTarget");
252                }
253            }
254
255            if (!string.IsNullOrEmpty(obj.SessionId))
256            {
257                var session = GetSession(obj.SessionId);
258                session?.OnMessage(obj);
259            }
260            else if (obj.Id.HasValue)
261            {
262                // If we get the object we are waiting for we return if
263                // if not we add this to the list, sooner or later some one will come for it
264                if (_callbacks.TryRemove(obj.Id.Value, out var callback))
265                {
266                    MessageQueue.Enqueue(callback, obj);
267                }
268            }
269            else
270            {
271                MessageReceived?.Invoke(this, new MessageEventArgs
272                {
273                    MessageID = method,
274                    MessageData = obj.Params
275                });
276            }
277        }
278
279        private void Transport_Closed(object sender, TransportClosedEventArgs e) => Close(e.CloseReason);
280
281        internal static async Task<Connection> Create(string url, IConnectionOptions connectionOptions, ILoggerFactory loggerFactory = null, CancellationToken cancellationToken = default)
282        {
283#pragma warning disable 618
284            var transport = connectionOptions.Transport;
285#pragma warning restore 618
286            if (transport == null)
287            {
288                var transportFactory = connectionOptions.TransportFactory ?? WebSocketTransport.DefaultTransportFactory;
289                transport = await transportFactory(new Uri(url), connectionOptions, cancellationToken).ConfigureAwait(false);
290            }
291
292            return new Connection(url, connectionOptions.SlowMo, connectionOptions.EnqueueAsyncMessages, transport, loggerFactory);
293        }
294
295        /// <inheritdoc />
296        public void Dispose()
297        {
298            Dispose(true);
299            GC.SuppressFinalize(this);
300        }
301
302        /// <summary>
303        /// Releases all resource used by the <see cref="Connection"/> object.
304        /// It will raise the <see cref="Disconnected"/> event and dispose <see cref="Transport"/>.
305        /// </summary>
306        /// <remarks>Call <see cref="Dispose()"/> when you are finished using the <see cref="Connection"/>. The
307        /// <see cref="Dispose()"/> method leaves the <see cref="Connection"/> in an unusable state.
308        /// After calling <see cref="Dispose()"/>, you must release all references to the
309        /// <see cref="Connection"/> so the garbage collector can reclaim the memory that the
310        /// <see cref="Connection"/> was occupying.</remarks>
311        /// <param name="disposing">Indicates whether disposal was initiated by <see cref="Dispose()"/> operation.</param>
312        protected virtual void Dispose(bool disposing)
313        {
314            Close("Connection disposed");
315            Transport.MessageReceived -= Transport_MessageReceived;
316            Transport.Closed -= Transport_Closed;
317            Transport.Dispose();
318            _callbackQueue.Dispose();
319        }
320    }
321}
322
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Most used method in AsyncMessageQueue

Trigger AsyncMessageQueue code on LambdaTest Cloud Grid

Execute automation tests with AsyncMessageQueue on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)