How to use ProcessIncomingMessage method of PuppeteerSharp.Connection class

Best Puppeteer-sharp code snippet using PuppeteerSharp.Connection.ProcessIncomingMessage

Run Puppeteer-sharp automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Connection.cs

Source: Connection.cs Github

copy
1using Newtonsoft.Json;
2using Newtonsoft.Json.Linq;
3using PuppeteerSharp.Helpers;
4using PuppeteerSharp.Helpers.Json;
5using PuppeteerSharp.Messaging;
6using PuppeteerSharp.Transport;
7using System;
8using System.Collections.Concurrent;
9using System.Linq;
10using System.Threading;
11using System.Threading.Tasks;
12
13namespace PuppeteerSharp
14{
15    /// <summary>
16    /// A connection handles the communication with a Chromium browser
17    /// </summary>
18    public class Connection : IDisposable
19    {
20        private TaskQueue _callbackQueue = new TaskQueue();
21
22        internal Connection(string url, int delay, IConnectionTransport transport)
23        {
24            Url = url;
25            Delay = delay;
26            Transport = transport;
27
28            Transport.MessageReceived += Transport_MessageReceived;
29            Transport.Closed += Transport_Closed;
30            _callbacks = new ConcurrentDictionary<int, MessageTask>();
31            _sessions = new ConcurrentDictionary<string, CDPSession>();
32            _asyncSessions = new AsyncDictionaryHelper<string, CDPSession>(_sessions, "Session {0} not found");
33        }
34
35        #region Private Members
36        private int _lastId;
37        private readonly ConcurrentDictionary<int, MessageTask> _callbacks;
38        private readonly ConcurrentDictionary<string, CDPSession> _sessions;
39        private readonly AsyncDictionaryHelper<string, CDPSession> _asyncSessions;
40        #endregion
41
42        #region Properties
43        /// <summary>
44        /// Gets the WebSocket URL.
45        /// </summary>
46        /// <value>The URL.</value>
47        public string Url { get; }
48        /// <summary>
49        /// Gets the sleep time when a message is received.
50        /// </summary>
51        /// <value>The delay.</value>
52        public int Delay { get; }
53        /// <summary>
54        /// Gets the Connection transport.
55        /// </summary>
56        /// <value>Connection transport.</value>
57        public IConnectionTransport Transport { get; }
58        /// <summary>
59        /// Occurs when the connection is closed.
60        /// </summary>
61        public event EventHandler Disconnected;
62        /// <summary>
63        /// Occurs when a message from chromium is received.
64        /// </summary>
65        public event EventHandler<MessageEventArgs> MessageReceived;
66        /// <summary>
67        /// Gets or sets a value indicating whether this <see cref="Connection"/> is closed.
68        /// </summary>
69        /// <value><c>true</c> if is closed; otherwise, <c>false</c>.</value>
70        public bool IsClosed { get; internal set; }
71
72        /// <summary>
73        /// Connection close reason.
74        /// </summary>
75        public string CloseReason { get; private set; }
76
77        #endregion
78
79        #region Public Methods
80
81        internal int GetMessageID() => Interlocked.Increment(ref _lastId);
82        internal Task RawSendASync(int id, string method, object args, string sessionId = null)
83        {
84            return Transport.SendAsync(JsonConvert.SerializeObject(
85                new ConnectionRequest
86                {
87                    Id = id,
88                    Method = method,
89                    Params = args,
90                    SessionId = sessionId
91                },
92                JsonHelper.DefaultJsonSerializerSettings));
93        }
94
95        internal async Task<JObject> SendAsync(string method, object args = null, bool waitForCallback = true)
96        {
97            if (IsClosed)
98            {
99                throw new TargetClosedException($"Protocol error({method}): Target closed.", CloseReason);
100            }
101
102            var id = GetMessageID();
103
104            MessageTask callback = null;
105            if (waitForCallback)
106            {
107                callback = new MessageTask
108                {
109                    TaskWrapper = new TaskCompletionSource<JObject>(),
110                    Method = method
111                };
112                _callbacks[id] = callback;
113            }
114
115            await RawSendASync(id, method, args).ConfigureAwait(false);
116            return waitForCallback ? await callback.TaskWrapper.Task.ConfigureAwait(false) : null;
117        }
118
119        internal async Task<T> SendAsync<T>(string method, object args = null)
120        {
121            var response = await SendAsync(method, args).ConfigureAwait(false);
122            return response.ToObject<T>(true);
123        }
124
125        internal async Task<CDPSession> CreateSessionAsync(TargetInfo targetInfo)
126        {
127            var sessionId = (await SendAsync<TargetAttachToTargetResponse>("Target.attachToTarget", new TargetAttachToTargetRequest
128            {
129                TargetId = targetInfo.TargetId,
130                Flatten = true
131            }).ConfigureAwait(false)).SessionId;
132            return await GetSessionAsync(sessionId).ConfigureAwait(false);
133        }
134
135        internal bool HasPendingCallbacks() => _callbacks.Count != 0;
136        #endregion
137
138        internal void Close(string closeReason)
139        {
140            if (IsClosed)
141            {
142                return;
143            }
144
145            IsClosed = true;
146            CloseReason = closeReason;
147
148            Transport.StopReading();
149            Disconnected?.Invoke(this, new EventArgs());
150
151            foreach (var session in _sessions.Values.ToArray())
152            {
153                session.Close(closeReason);
154            }
155
156            _sessions.Clear();
157
158            foreach (var response in _callbacks.Values.ToArray())
159            {
160                response.TaskWrapper.TrySetException(new TargetClosedException(
161                    $"Protocol error({response.Method}): Target closed.",
162                    closeReason));
163            }
164
165            _callbacks.Clear();
166        }
167
168        internal static Connection FromSession(CDPSession session) => session.Connection;
169        internal CDPSession GetSession(string sessionId) => _sessions.GetValueOrDefault(sessionId);
170        internal Task<CDPSession> GetSessionAsync(string sessionId) => _asyncSessions.GetItemAsync(sessionId);
171
172        #region Private Methods
173
174        private async void Transport_MessageReceived(object sender, MessageReceivedEventArgs e)
175            => await _callbackQueue.Enqueue(() => ProcessMessage(e)).ConfigureAwait(false);
176
177        private async Task ProcessMessage(MessageReceivedEventArgs e)
178        {
179            try
180            {
181                var response = e.Message;
182                ConnectionResponse obj = null;
183
184                if (response.Length > 0 && Delay > 0)
185                {
186                    await Task.Delay(Delay).ConfigureAwait(false);
187                }
188
189                try
190                {
191                    obj = JsonConvert.DeserializeObject<ConnectionResponse>(response, JsonHelper.DefaultJsonSerializerSettings);
192                }
193                catch (JsonException)
194                {
195                    return;
196                }
197
198                ProcessIncomingMessage(obj);
199            }
200            catch (Exception ex)
201            {
202                var message = $"Connection failed to process {e.Message}. {ex.Message}. {ex.StackTrace}";
203                Close(message);
204            }
205        }
206
207        private void ProcessIncomingMessage(ConnectionResponse obj)
208        {
209            var method = obj.Method;
210            var param = obj.Params?.ToObject<ConnectionResponseParams>();
211
212            if (method == "Target.attachedToTarget")
213            {
214                var sessionId = param.SessionId;
215                var session = new CDPSession(this, param.TargetInfo.Type, sessionId);
216                _asyncSessions.AddItem(sessionId, session);
217            }
218            else if (method == "Target.detachedFromTarget")
219            {
220                var sessionId = param.SessionId;
221                if (_sessions.TryRemove(sessionId, out var session) && !session.IsClosed)
222                {
223                    session.Close("Target.detachedFromTarget");
224                }
225            }
226
227            if (!string.IsNullOrEmpty(obj.SessionId))
228            {
229                var session = GetSession(obj.SessionId);
230                session.OnMessage(obj);
231            }
232            else if (obj.Id.HasValue)
233            {
234                // If we get the object we are waiting for we return if
235                // if not we add this to the list, sooner or later some one will come for it 
236                if (_callbacks.TryRemove(obj.Id.Value, out var callback))
237                {
238                    if (obj.Error != null)
239                    {
240                        callback.TaskWrapper.TrySetException(new MessageException(callback, obj.Error));
241                    }
242                    else
243                    {
244                        callback.TaskWrapper.TrySetResult(obj.Result);
245                    }
246                }
247            }
248            else
249            {
250                MessageReceived?.Invoke(this, new MessageEventArgs
251                {
252                    MessageID = method,
253                    MessageData = obj.Params
254                });
255            }
256        }
257
258        private void Transport_Closed(object sender, TransportClosedEventArgs e) => Close(e.CloseReason);
259
260        #endregion
261
262        #region Static Methods
263
264        /// <summary>
265        /// Gets default web socket factory implementation.
266        /// </summary>
267        [Obsolete("Use " + nameof(WebSocketTransport) + "." + nameof(WebSocketTransport.DefaultWebSocketFactory) + " instead")]
268        public static readonly WebSocketFactory DefaultWebSocketFactory = WebSocketTransport.DefaultWebSocketFactory;
269
270        internal static async Task<Connection> Create(string url, IConnectionOptions connectionOptions, CancellationToken cancellationToken = default)
271        {
272#pragma warning disable 618
273            var transport = connectionOptions.Transport;
274#pragma warning restore 618
275            if (transport == null)
276            {
277                var transportFactory = connectionOptions.TransportFactory ?? WebSocketTransport.DefaultTransportFactory;
278                transport = await transportFactory(new Uri(url), connectionOptions, cancellationToken).ConfigureAwait(false);
279            }
280
281            return new Connection(url, connectionOptions.SlowMo, transport);
282        }
283
284        /// <inheritdoc />
285        public void Dispose()
286        {
287            Dispose(true);
288            GC.SuppressFinalize(this);
289        }
290
291        /// <summary>
292        /// Releases all resource used by the <see cref="Connection"/> object.
293        /// It will raise the <see cref="Disconnected"/> event and dispose <see cref="Transport"/>.
294        /// </summary>
295        /// <remarks>Call <see cref="Dispose()"/> when you are finished using the <see cref="Connection"/>. The
296        /// <see cref="Dispose()"/> method leaves the <see cref="Connection"/> in an unusable state.
297        /// After calling <see cref="Dispose()"/>, you must release all references to the
298        /// <see cref="Connection"/> so the garbage collector can reclaim the memory that the
299        /// <see cref="Connection"/> was occupying.</remarks>
300        /// <param name="disposing">Indicates whether disposal was initiated by <see cref="Dispose()"/> operation.</param>
301        protected virtual void Dispose(bool disposing)
302        {
303            Close("Connection disposed");
304            Transport.Dispose();
305        }
306        #endregion
307    }
308}
309
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Trigger ProcessIncomingMessage code on LambdaTest Cloud Grid

Execute automation tests with ProcessIncomingMessage on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)