...34import org.openqa.selenium.events.EventBus;35import org.openqa.selenium.grid.data.Availability;36import org.openqa.selenium.grid.data.CreateSessionRequest;37import org.openqa.selenium.grid.data.CreateSessionResponse;38import org.openqa.selenium.grid.data.NodeDrainComplete;39import org.openqa.selenium.grid.data.NodeDrainStarted;40import org.openqa.selenium.grid.data.NodeHeartBeatEvent;41import org.openqa.selenium.grid.data.NodeId;42import org.openqa.selenium.grid.data.NodeStatus;43import org.openqa.selenium.grid.data.Session;44import org.openqa.selenium.grid.data.SessionClosedEvent;45import org.openqa.selenium.grid.data.Slot;46import org.openqa.selenium.grid.data.SlotId;47import org.openqa.selenium.grid.jmx.JMXHelper;48import org.openqa.selenium.grid.jmx.ManagedAttribute;49import org.openqa.selenium.grid.jmx.ManagedService;50import org.openqa.selenium.grid.node.ActiveSession;51import org.openqa.selenium.grid.node.HealthCheck;52import org.openqa.selenium.grid.node.Node;53import org.openqa.selenium.grid.node.SessionFactory;54import org.openqa.selenium.grid.node.config.NodeOptions;55import org.openqa.selenium.grid.node.docker.DockerAssetsPath;56import org.openqa.selenium.grid.node.local.LocalNode;57import org.openqa.selenium.grid.node.local.SessionSlot;58import org.openqa.selenium.grid.security.Secret;59import org.openqa.selenium.internal.Debug;60import org.openqa.selenium.internal.Either;61import org.openqa.selenium.internal.Require;62import org.openqa.selenium.io.TemporaryFilesystem;63import org.openqa.selenium.io.Zip;64import org.openqa.selenium.json.Json;65import org.openqa.selenium.remote.SessionId;66import org.openqa.selenium.remote.http.HttpMethod;67import org.openqa.selenium.remote.http.HttpRequest;68import org.openqa.selenium.remote.http.HttpResponse;69import org.openqa.selenium.remote.tracing.AttributeKey;70import org.openqa.selenium.remote.tracing.EventAttribute;71import org.openqa.selenium.remote.tracing.EventAttributeValue;72import org.openqa.selenium.remote.tracing.Span;73import org.openqa.selenium.remote.tracing.Status;74import org.openqa.selenium.remote.tracing.Tracer;75import java.io.File;76import java.io.IOException;77import java.io.InputStreamReader;78import java.io.UncheckedIOException;79import java.net.URI;80import java.net.URISyntaxException;81import java.nio.file.Files;82import java.nio.file.Paths;83import java.nio.file.StandardCopyOption;84import java.time.Duration;85import java.time.Instant;86import java.util.Collections;87import java.util.HashMap;88import java.util.List;89import java.util.Map;90import java.util.Optional;91import java.util.Set;92import java.util.UUID;93import java.util.concurrent.ExecutionException;94import java.util.concurrent.atomic.AtomicInteger;95import java.util.logging.Level;96import java.util.logging.Logger;97import java.util.stream.Collectors;98@ManagedService(objectName = "com.saucelabs.grid:type=Node,name=SauceNode",99 description = "SauceNode running the webdriver sessions and uploading results to Sauce.")100public class SauceNode extends Node {101 private static final Logger LOG = Logger.getLogger(LocalNode.class.getName());102 private final EventBus bus;103 private final URI externalUri;104 private final URI gridUri;105 private final Duration heartbeatPeriod;106 private final HealthCheck healthCheck;107 private final int maxSessionCount;108 private final List<SessionSlot> factories;109 private final Cache<SessionId, SessionSlot> currentSessions;110 private final Cache<SessionId, TemporaryFilesystem> tempFileSystems;111 private final AtomicInteger pendingSessions = new AtomicInteger();112 private SauceNode(113 Tracer tracer,114 EventBus bus,115 URI uri,116 URI gridUri,117 HealthCheck healthCheck,118 int maxSessionCount,119 Ticker ticker,120 Duration sessionTimeout,121 Duration heartbeatPeriod,122 List<SessionSlot> factories,123 Secret registrationSecret) {124 super(tracer, new NodeId(UUID.randomUUID()), uri, registrationSecret);125 this.bus = Require.nonNull("Event bus", bus);126 this.externalUri = Require.nonNull("Remote node URI", uri);127 this.gridUri = Require.nonNull("Grid URI", gridUri);128 this.maxSessionCount = Math.min(Require.positive("Max session count", maxSessionCount), factories.size());129 this.heartbeatPeriod = heartbeatPeriod;130 this.factories = ImmutableList.copyOf(factories);131 Require.nonNull("Registration secret", registrationSecret);132 this.healthCheck = healthCheck == null ?133 () -> new HealthCheck.Result(134 isDraining() ? DRAINING : UP,135 String.format("%s is %s", uri, isDraining() ? "draining" : "up")) :136 healthCheck;137 this.currentSessions = CacheBuilder.newBuilder()138 .expireAfterAccess(sessionTimeout)139 .ticker(ticker)140 .removalListener((RemovalListener<SessionId, SessionSlot>) notification -> {141 // Attempt to stop the session142 LOG.log(Debug.getDebugLogLevel(), "Stopping session {0}", notification.getKey().toString());143 SessionSlot slot = notification.getValue();144 if (!slot.isAvailable()) {145 slot.stop();146 }147 })148 .build();149 this.tempFileSystems = CacheBuilder.newBuilder()150 .expireAfterAccess(sessionTimeout)151 .ticker(ticker)152 .removalListener((RemovalListener<SessionId, TemporaryFilesystem>) notification -> {153 TemporaryFilesystem tempFS = notification.getValue();154 tempFS.deleteTemporaryFiles();155 tempFS.deleteBaseDir();156 })157 .build();158 Regularly sessionCleanup = new Regularly("Session Cleanup Node: " + externalUri);159 sessionCleanup.submit(currentSessions::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30));160 Regularly tmpFileCleanup = new Regularly("TempFile Cleanup Node: " + externalUri);161 tmpFileCleanup.submit(tempFileSystems::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30));162 Regularly regularHeartBeat = new Regularly("Heartbeat Node: " + externalUri);163 regularHeartBeat.submit(() -> bus.fire(new NodeHeartBeatEvent(getStatus())), heartbeatPeriod,164 heartbeatPeriod);165 bus.addListener(SessionClosedEvent.listener(id -> {166 // Listen to session terminated events so we know when to fire the NodeDrainComplete event167 if (this.isDraining()) {168 int done = pendingSessions.decrementAndGet();169 if (done <= 0) {170 LOG.info("Firing node drain complete message");171 bus.fire(new NodeDrainComplete(this.getId()));172 }173 }174 }));175 Runtime.getRuntime().addShutdownHook(new Thread(this::stopAllSessions));176 new JMXHelper().register(this);177 }178 public static SauceNode.Builder builder(179 Tracer tracer,180 EventBus bus,181 URI uri,182 URI gridUri,183 Secret registrationSecret) {184 return new SauceNode.Builder(tracer, bus, uri, gridUri, registrationSecret);185 }186 @Override187 public boolean isReady() {188 return bus.isReady();189 }190 @VisibleForTesting191 public int getCurrentSessionCount() {192 // It seems wildly unlikely we'll overflow an int193 return Math.toIntExact(currentSessions.size());194 }195 @ManagedAttribute(name = "MaxSessions")196 public int getMaxSessionCount() {197 return maxSessionCount;198 }199 @ManagedAttribute(name = "Status")200 public Availability getAvailability() {201 return isDraining() ? DRAINING : UP;202 }203 @ManagedAttribute(name = "TotalSlots")204 public int getTotalSlots() {205 return factories.size();206 }207 @ManagedAttribute(name = "UsedSlots")208 public long getUsedSlots() {209 return factories.stream().filter(sessionSlot -> !sessionSlot.isAvailable()).count();210 }211 @ManagedAttribute(name = "Load")212 public float getLoad() {213 long inUse = factories.stream().filter(sessionSlot -> !sessionSlot.isAvailable()).count();214 return inUse / (float) maxSessionCount * 100f;215 }216 @ManagedAttribute(name = "RemoteNodeUri")217 public URI getExternalUri() {218 return this.getUri();219 }220 @ManagedAttribute(name = "GridUri")221 public URI getGridUri() {222 return this.gridUri;223 }224 @ManagedAttribute(name = "NodeId")225 public String getNodeId() {226 return getId().toString();227 }228 @Override229 public boolean isSupporting(Capabilities capabilities) {230 return factories.parallelStream().anyMatch(factory -> factory.test(capabilities));231 }232 @Override233 public Either<WebDriverException, CreateSessionResponse> newSession(CreateSessionRequest sessionRequest) {234 Require.nonNull("Session request", sessionRequest);235 try (Span span = tracer.getCurrentContext().createSpan("node.new_session")) {236 Map<String, EventAttributeValue> attributeMap = new HashMap<>();237 attributeMap238 .put(AttributeKey.LOGGER_CLASS.getKey(), EventAttribute.setValue(getClass().getName()));239 LOG.fine("Creating new session using span: " + span);240 attributeMap.put("session.request.capabilities",241 EventAttribute.setValue(sessionRequest.getDesiredCapabilities().toString()));242 attributeMap.put("session.request.downstreamdialect",243 EventAttribute.setValue(sessionRequest.getDownstreamDialects().toString()));244 int currentSessionCount = getCurrentSessionCount();245 span.setAttribute("current.session.count", currentSessionCount);246 attributeMap.put("current.session.count", EventAttribute.setValue(currentSessionCount));247 if (getCurrentSessionCount() >= maxSessionCount) {248 span.setAttribute("error", true);249 span.setStatus(Status.RESOURCE_EXHAUSTED);250 attributeMap.put("max.session.count", EventAttribute.setValue(maxSessionCount));251 span.addEvent("Max session count reached", attributeMap);252 return Either.left(new RetrySessionRequestException("Max session count reached."));253 }254 if (isDraining()) {255 span.setStatus(Status.UNAVAILABLE.withDescription("The node is draining. Cannot accept new sessions."));256 return Either.left(257 new RetrySessionRequestException("The node is draining. Cannot accept new sessions."));258 }259 // Identify possible slots to use as quickly as possible to enable concurrent session starting260 SessionSlot slotToUse = null;261 synchronized(factories) {262 for (SessionSlot factory : factories) {263 if (!factory.isAvailable() || !factory.test(sessionRequest.getDesiredCapabilities())) {264 continue;265 }266 factory.reserve();267 slotToUse = factory;268 break;269 }270 }271 if (slotToUse == null) {272 span.setAttribute("error", true);273 span.setStatus(Status.NOT_FOUND);274 span.addEvent("No slot matched capabilities ", attributeMap);275 return Either.left(276 new RetrySessionRequestException("No slot matched the requested capabilities."));277 }278 Either<WebDriverException, ActiveSession> possibleSession = slotToUse.apply(sessionRequest);279 if (possibleSession.isRight()) {280 ActiveSession session = possibleSession.right();281 currentSessions.put(session.getId(), slotToUse);282 SessionId sessionId = session.getId();283 Capabilities caps = session.getCapabilities();284 SESSION_ID.accept(span, sessionId);285 CAPABILITIES.accept(span, caps);286 String downstream = session.getDownstreamDialect().toString();287 String upstream = session.getUpstreamDialect().toString();288 String sessionUri = session.getUri().toString();289 span.setAttribute(AttributeKey.DOWNSTREAM_DIALECT.getKey(), downstream);290 span.setAttribute(AttributeKey.UPSTREAM_DIALECT.getKey(), upstream);291 span.setAttribute(AttributeKey.SESSION_URI.getKey(), sessionUri);292 // The session we return has to look like it came from the node, since we might be dealing293 // with a webdriver implementation that only accepts connections from localhost294 boolean isSupportingCdp = slotToUse.isSupportingCdp() ||295 caps.getCapability("se:cdp") != null;296 Session externalSession = createExternalSession(session, externalUri, isSupportingCdp);297 return Either.right(new CreateSessionResponse(298 externalSession,299 getEncoder(session.getDownstreamDialect()).apply(externalSession)));300 } else {301 slotToUse.release();302 span.setAttribute("error", true);303 span.addEvent("Unable to create session with the driver", attributeMap);304 return Either.left(possibleSession.left());305 }306 }307 }308 @Override309 public boolean isSessionOwner(SessionId id) {310 Require.nonNull("Session ID", id);311 return currentSessions.getIfPresent(id) != null;312 }313 @Override314 public Session getSession(SessionId id) throws NoSuchSessionException {315 Require.nonNull("Session ID", id);316 SessionSlot slot = currentSessions.getIfPresent(id);317 if (slot == null) {318 throw new NoSuchSessionException("Cannot find session with id: " + id);319 }320 return createExternalSession(slot.getSession(), externalUri, slot.isSupportingCdp());321 }322 @Override323 public TemporaryFilesystem getTemporaryFilesystem(SessionId id) throws IOException {324 try {325 return tempFileSystems.get(id, () -> TemporaryFilesystem.getTmpFsBasedOn(326 TemporaryFilesystem.getDefaultTmpFS().createTempDir("session", id.toString())));327 } catch (ExecutionException e) {328 throw new IOException(e);329 }330 }331 @Override332 public HttpResponse executeWebDriverCommand(HttpRequest req) {333 // True enough to be good enough334 SessionId id = getSessionId(req.getUri()).map(SessionId::new)335 .orElseThrow(() -> new NoSuchSessionException("Cannot find session: " + req));336 SessionSlot slot = currentSessions.getIfPresent(id);337 if (slot == null) {338 throw new NoSuchSessionException("Cannot find session with id: " + id);339 }340 ActiveSession activeSession = slot.getSession();341 if (activeSession.getClass().getName().contains("RelaySessionFactory")) {342 HttpResponse toReturn = slot.execute(req);343 if (req.getMethod() == DELETE && req.getUri().equals("/session/" + id)) {344 stop(id);345 }346 return toReturn;347 }348 SauceDockerSession session = (SauceDockerSession) activeSession;349 SauceCommandInfo.Builder builder = new SauceCommandInfo.Builder();350 builder.setStartTime(Instant.now().toEpochMilli());351 HttpResponse toReturn = slot.execute(req);352 if (req.getMethod() == DELETE && req.getUri().equals("/session/" + id)) {353 stop(id);354 builder.setScreenshotId(-1);355 } else {356 // Only taking screenshots after a url has been loaded357 if (!session.canTakeScreenshot() && req.getMethod() == POST358 && req.getUri().endsWith("/url")) {359 session.enableScreenshots();360 }361 int screenshotId = takeScreenshot(session, req, slot);362 builder.setScreenshotId(screenshotId);363 }364 Map<String, Object> parsedResponse =365 JSON.toType(new InputStreamReader(toReturn.getContent().get()), MAP_TYPE);366 builder.setRequest(getRequestContents(req))367 .setResult(parsedResponse)368 .setPath(req.getUri().replace(String.format("/session/%s", id), ""))369 .setHttpStatus(toReturn.getStatus())370 .setHttpMethod(req.getMethod().name())371 .setStatusCode(0);372 if (parsedResponse.containsKey("value") && parsedResponse.get("value") != null373 && parsedResponse.get("value").toString().contains("error")) {374 builder.setStatusCode(1);375 }376 builder.setEndTime(Instant.now().toEpochMilli());377 session.addSauceCommandInfo(builder.build());378 return toReturn;379 }380 @Override381 public HttpResponse uploadFile(HttpRequest req, SessionId id) {382 // When the session is running in a Docker container, the upload file command383 // needs to be forwarded to the container as well.384 SessionSlot slot = currentSessions.getIfPresent(id);385 if (slot != null && slot.getSession() instanceof SauceDockerSession) {386 return executeWebDriverCommand(req);387 }388 Map<String, Object> incoming = JSON.toType(string(req), Json.MAP_TYPE);389 File tempDir;390 try {391 TemporaryFilesystem tempfs = getTemporaryFilesystem(id);392 tempDir = tempfs.createTempDir("upload", "file");393 Zip.unzip((String) incoming.get("file"), tempDir);394 } catch (IOException e) {395 throw new UncheckedIOException(e);396 }397 // Select the first file398 File[] allFiles = tempDir.listFiles();399 if (allFiles == null) {400 throw new WebDriverException(401 String.format("Cannot access temporary directory for uploaded files %s", tempDir));402 }403 if (allFiles.length != 1) {404 throw new WebDriverException(405 String.format("Expected there to be only 1 file. There were: %s", allFiles.length));406 }407 ImmutableMap<String, Object> result = ImmutableMap.of(408 "value", allFiles[0].getAbsolutePath());409 return new HttpResponse().setContent(asJson(result));410 }411 @Override412 public void stop(SessionId id) throws NoSuchSessionException {413 Require.nonNull("Session ID", id);414 SessionSlot slot = currentSessions.getIfPresent(id);415 if (slot == null) {416 throw new NoSuchSessionException("Cannot find session with id: " + id);417 }418 currentSessions.invalidate(id);419 tempFileSystems.invalidate(id);420 }421 private void stopAllSessions() {422 if (currentSessions.size() > 0) {423 LOG.info("Trying to stop all running sessions before shutting down...");424 currentSessions.invalidateAll();425 }426 }427 private Session createExternalSession(ActiveSession other, URI externalUri, boolean isSupportingCdp) {428 Capabilities toUse = ImmutableCapabilities.copyOf(other.getCapabilities());429 // Rewrite the se:options if necessary to send the cdp url back430 if (isSupportingCdp) {431 String cdpPath = String.format("/session/%s/se/cdp", other.getId());432 toUse = new PersistentCapabilities(toUse).setCapability("se:cdp", rewrite(cdpPath));433 }434 return new Session(other.getId(), externalUri, other.getStereotype(), toUse, Instant.now());435 }436 private URI rewrite(String path) {437 try {438 String scheme = "https".equals(gridUri.getScheme()) ? "wss" : "ws";439 return new URI(440 scheme,441 gridUri.getUserInfo(),442 gridUri.getHost(),443 gridUri.getPort(),444 path,445 null,446 null);447 } catch (URISyntaxException e) {448 throw new RuntimeException(e);449 }450 }451 @Override452 public NodeStatus getStatus() {453 Set<Slot> slots = factories.stream()454 .map(slot -> {455 Instant lastStarted = Instant.EPOCH;456 Session session = null;457 if (!slot.isAvailable()) {458 ActiveSession activeSession = slot.getSession();459 if (activeSession != null) {460 lastStarted = activeSession.getStartTime();461 session = new Session(462 activeSession.getId(),463 activeSession.getUri(),464 slot.getStereotype(),465 activeSession.getCapabilities(),466 activeSession.getStartTime());467 }468 }469 return new Slot(470 new SlotId(getId(), slot.getId()),471 slot.getStereotype(),472 lastStarted,473 session);474 })475 .collect(toImmutableSet());476 return new NodeStatus(477 getId(),478 externalUri,479 maxSessionCount,480 slots,481 isDraining() ? DRAINING : UP,482 heartbeatPeriod,483 getNodeVersion(),484 getOsInfo());485 }486 @Override487 public HealthCheck getHealthCheck() {488 return healthCheck;489 }490 @Override491 public void drain() {492 bus.fire(new NodeDrainStarted(getId()));493 draining = true;494 int currentSessionCount = getCurrentSessionCount();495 if (currentSessionCount == 0) {496 LOG.info("Firing node drain complete message");497 bus.fire(new NodeDrainComplete(getId()));498 } else {499 pendingSessions.set(currentSessionCount);500 }501 }502 private Map<String, Object> toJson() {503 return ImmutableMap.of(504 "id", getId(),505 "uri", externalUri,506 "maxSessions", maxSessionCount,507 "draining", isDraining(),508 "capabilities", factories.stream()509 .map(SessionSlot::getStereotype)510 .collect(Collectors.toSet()));511 }512 public static class Builder {513 private final Tracer tracer;514 private final EventBus bus;515 private final URI uri;516 private final URI gridUri;517 private final Secret registrationSecret;518 private final ImmutableList.Builder<SessionSlot> factories;519 private int maxCount = Runtime.getRuntime().availableProcessors() * 5;520 private Ticker ticker = Ticker.systemTicker();521 private Duration sessionTimeout = Duration.ofMinutes(5);...