...160 this.regularly = new Regularly("Local Node: " + externalUri);161 regularly.submit(currentSessions::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30));162 regularly.submit(tempFileSystems::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30));163 bus.addListener(NodeAddedEvent.listener(nodeId -> {164 if (getId().equals(nodeId)) {165 // Lets avoid to create more than one "Regularly" when the Node registers again.166 if (!heartBeatStarted.getAndSet(true)) {167 regularly.submit(168 () -> NodeHeartBeatEvent(getStatus())), heartbeatPeriod, heartbeatPeriod);169 }170 }171 }));172 bus.addListener(SessionClosedEvent.listener(id -> {173 try {174 this.stop(id);175 } catch (NoSuchSessionException ignore) {176 }177 if (this.isDraining()) {178 int done = pendingSessions.decrementAndGet();179 if (done <= 0) {180"Firing node drain complete message");181 NodeDrainComplete(this.getId()));182 }183 }184 }));185 new JMXHelper().register(this);186 }187 public static Builder builder(188 Tracer tracer,189 EventBus bus,190 URI uri,191 URI gridUri,192 Secret registrationSecret) {193 return new Builder(tracer, bus, uri, gridUri, registrationSecret);194 }195 @Override196 public boolean isReady() {197 return bus.isReady();198 }199 @VisibleForTesting200 @ManagedAttribute(name = "CurrentSessions")201 public int getCurrentSessionCount() {202 // It seems wildly unlikely we'll overflow an int203 return Math.toIntExact(currentSessions.size());204 }205 @ManagedAttribute(name = "MaxSessions")206 public int getMaxSessionCount() {207 return maxSessionCount;208 }209 @ManagedAttribute(name = "Status")210 public Availability getAvailability() {211 return isDraining() ? DRAINING : UP;212 }213 @ManagedAttribute(name = "TotalSlots")214 public int getTotalSlots() {215 return factories.size();216 }217 @ManagedAttribute(name = "UsedSlots")218 public long getUsedSlots() {219 return -> !sessionSlot.isAvailable()).count();220 }221 @ManagedAttribute(name = "Load")222 public float getLoad() {223 long inUse = -> !sessionSlot.isAvailable()).count();224 return inUse / (float) maxSessionCount * 100f;225 }226 @ManagedAttribute(name = "RemoteNodeUri")227 public URI getExternalUri() {228 return this.getUri();229 }230 @ManagedAttribute(name = "GridUri")231 public URI getGridUri() {232 return this.gridUri;233 }234 @ManagedAttribute(name = "NodeId")235 public String getNodeId() {236 return getId().toString();237 }238 @Override239 public boolean isSupporting(Capabilities capabilities) {240 return factories.parallelStream().anyMatch(factory -> factory.test(capabilities));241 }242 @Override243 public Either<WebDriverException, CreateSessionResponse> newSession(CreateSessionRequest sessionRequest) {244 Require.nonNull("Session request", sessionRequest);245 try (Span span = tracer.getCurrentContext().createSpan("node.new_session")) {246 Map<String, EventAttributeValue> attributeMap = new HashMap<>();247 attributeMap248 .put(AttributeKey.LOGGER_CLASS.getKey(), EventAttribute.setValue(getClass().getName()));249 attributeMap.put("session.request.capabilities",250 EventAttribute.setValue(sessionRequest.getDesiredCapabilities().toString()));251 attributeMap.put("session.request.downstreamdialect",252 EventAttribute.setValue(sessionRequest.getDownstreamDialects().toString()));253 int currentSessionCount = getCurrentSessionCount();254 span.setAttribute("current.session.count", currentSessionCount);255 attributeMap.put("current.session.count", EventAttribute.setValue(currentSessionCount));256 if (getCurrentSessionCount() >= maxSessionCount) {257 span.setAttribute("error", true);258 span.setStatus(Status.RESOURCE_EXHAUSTED);259 attributeMap.put("max.session.count", EventAttribute.setValue(maxSessionCount));260 span.addEvent("Max session count reached", attributeMap);261 return Either.left(new RetrySessionRequestException("Max session count reached."));262 }263 if (isDraining()) {264 span.setStatus(Status.UNAVAILABLE.withDescription("The node is draining. Cannot accept new sessions."));265 return Either.left(266 new RetrySessionRequestException("The node is draining. Cannot accept new sessions."));267 }268 // Identify possible slots to use as quickly as possible to enable concurrent session starting269 SessionSlot slotToUse = null;270 synchronized (factories) {271 for (SessionSlot factory : factories) {272 if (!factory.isAvailable() || !factory.test(sessionRequest.getDesiredCapabilities())) {273 continue;274 }275 factory.reserve();276 slotToUse = factory;277 break;278 }279 }280 if (slotToUse == null) {281 span.setAttribute("error", true);282 span.setStatus(Status.NOT_FOUND);283 span.addEvent("No slot matched the requested capabilities. ", attributeMap);284 return Either.left(285 new RetrySessionRequestException("No slot matched the requested capabilities."));286 }287 Either<WebDriverException, ActiveSession> possibleSession = slotToUse.apply(sessionRequest);288 if (possibleSession.isRight()) {289 ActiveSession session = possibleSession.right();290 currentSessions.put(session.getId(), slotToUse);291 SessionId sessionId = session.getId();292 Capabilities caps = session.getCapabilities();293 SESSION_ID.accept(span, sessionId);294 CAPABILITIES.accept(span, caps);295 String downstream = session.getDownstreamDialect().toString();296 String upstream = session.getUpstreamDialect().toString();297 String sessionUri = session.getUri().toString();298 span.setAttribute(AttributeKey.DOWNSTREAM_DIALECT.getKey(), downstream);299 span.setAttribute(AttributeKey.UPSTREAM_DIALECT.getKey(), upstream);300 span.setAttribute(AttributeKey.SESSION_URI.getKey(), sessionUri);301 // The session we return has to look like it came from the node, since we might be dealing302 // with a webdriver implementation that only accepts connections from localhost303 Session externalSession = createExternalSession(304 session,305 externalUri,306 slotToUse.isSupportingCdp() || caps.getCapability("se:cdp") != null);307 return Either.right(new CreateSessionResponse(308 externalSession,309 getEncoder(session.getDownstreamDialect()).apply(externalSession)));310 } else {311 slotToUse.release();312 span.setAttribute("error", true);313 span.addEvent("Unable to create session with the driver", attributeMap);314 return Either.left(possibleSession.left());315 }316 }317 }318 @Override319 public boolean isSessionOwner(SessionId id) {320 Require.nonNull("Session ID", id);321 return currentSessions.getIfPresent(id) != null;322 }323 @Override324 public Session getSession(SessionId id) throws NoSuchSessionException {325 Require.nonNull("Session ID", id);326 SessionSlot slot = currentSessions.getIfPresent(id);327 if (slot == null) {328 throw new NoSuchSessionException("Cannot find session with id: " + id);329 }330 return createExternalSession(slot.getSession(), externalUri, slot.isSupportingCdp());331 }332 @Override333 public TemporaryFilesystem getTemporaryFilesystem(SessionId id) throws IOException {334 try {335 return tempFileSystems.get(id, () -> TemporaryFilesystem.getTmpFsBasedOn(336 TemporaryFilesystem.getDefaultTmpFS().createTempDir("session", id.toString())));337 } catch (ExecutionException e) {338 throw new IOException(e);339 }340 }341 @Override342 public HttpResponse executeWebDriverCommand(HttpRequest req) {343 // True enough to be good enough344 SessionId id = getSessionId(req.getUri()).map(SessionId::new)345 .orElseThrow(() -> new NoSuchSessionException("Cannot find session: " + req));346 SessionSlot slot = currentSessions.getIfPresent(id);347 if (slot == null) {348 throw new NoSuchSessionException("Cannot find session with id: " + id);349 }350 HttpResponse toReturn = slot.execute(req);351 if (req.getMethod() == DELETE && req.getUri().equals("/session/" + id)) {352 stop(id);353 }354 return toReturn;355 }356 @Override357 public HttpResponse uploadFile(HttpRequest req, SessionId id) {358 Map<String, Object> incoming = JSON.toType(string(req), Json.MAP_TYPE);359 File tempDir;360 try {361 TemporaryFilesystem tempfs = getTemporaryFilesystem(id);362 tempDir = tempfs.createTempDir("upload", "file");363 Zip.unzip((String) incoming.get("file"), tempDir);364 } catch (IOException e) {365 throw new UncheckedIOException(e);366 }367 // Select the first file368 File[] allFiles = tempDir.listFiles();369 if (allFiles == null) {370 throw new WebDriverException(371 String.format("Cannot access temporary directory for uploaded files %s", tempDir));372 }373 if (allFiles.length != 1) {374 throw new WebDriverException(375 String.format("Expected there to be only 1 file. There were: %s", allFiles.length));376 }377 ImmutableMap<String, Object> result = ImmutableMap.of(378 "value", allFiles[0].getAbsolutePath());379 return new HttpResponse().setContent(asJson(result));380 }381 @Override382 public void stop(SessionId id) throws NoSuchSessionException {383 Require.nonNull("Session ID", id);384 SessionSlot slot = currentSessions.getIfPresent(id);385 if (slot == null) {386 throw new NoSuchSessionException("Cannot find session with id: " + id);387 }388 killSession(slot);389 tempFileSystems.invalidate(id);390 }391 private Session createExternalSession(ActiveSession other, URI externalUri, boolean isSupportingCdp) {392 Capabilities toUse = ImmutableCapabilities.copyOf(other.getCapabilities());393 // Rewrite the se:options if necessary to send the cdp url back394 if (isSupportingCdp) {395 String cdpPath = String.format("/session/%s/se/cdp", other.getId());396 toUse = new PersistentCapabilities(toUse).setCapability("se:cdp", rewrite(cdpPath));397 }398 return new Session(other.getId(), externalUri, other.getStereotype(), toUse,;399 }400 private URI rewrite(String path) {401 try {402 return new URI(403 "ws",404 gridUri.getUserInfo(),405 gridUri.getHost(),406 gridUri.getPort(),407 path,408 null,409 null);410 } catch (URISyntaxException e) {411 throw new RuntimeException(e);412 }413 }414 private void killSession(SessionSlot slot) {415 currentSessions.invalidate(slot.getSession().getId());416 // Attempt to stop the session417 if (!slot.isAvailable()) {418 slot.stop();419 }420 }421 @Override422 public NodeStatus getStatus() {423 Set<Slot> slots = .map(slot -> {425 Instant lastStarted = Instant.EPOCH;426 Optional<Session> session = Optional.empty();427 if (!slot.isAvailable()) {428 ActiveSession activeSession = slot.getSession();429 if (activeSession != null) {430 lastStarted = activeSession.getStartTime();431 session = Optional.of(432 new Session(433 activeSession.getId(),434 activeSession.getUri(),435 slot.getStereotype(),436 activeSession.getCapabilities(),437 activeSession.getStartTime()));438 }439 }440 return new Slot(441 new SlotId(getId(), slot.getId()),442 slot.getStereotype(),443 lastStarted,444 session);445 })446 .collect(toImmutableSet());447 return new NodeStatus(448 getId(),449 externalUri,450 maxSessionCount,451 slots,452 isDraining() ? DRAINING : UP,453 heartbeatPeriod,454 getNodeVersion(),455 getOsInfo());456 }457 @Override458 public HealthCheck getHealthCheck() {459 return healthCheck;460 }461 @Override462 public void drain() {463 NodeDrainStarted(getId()));464 draining = true;465 int currentSessionCount = getCurrentSessionCount();466 if (currentSessionCount == 0) {467"Firing node drain complete message");468 NodeDrainComplete(getId()));469 } else {470 pendingSessions.set(currentSessionCount);471 }472 }473 private Map<String, Object> toJson() {474 return ImmutableMap.of(475 "id", getId(),476 "uri", externalUri,477 "maxSessions", maxSessionCount,478 "draining", isDraining(),479 "capabilities", .map(SessionSlot::getStereotype)481 .collect(Collectors.toSet()));482 }483 public static class Builder {484 private final Tracer tracer;485 private final EventBus bus;486 private final URI uri;487 private final URI gridUri;488 private final Secret registrationSecret;489 private final ImmutableList.Builder<SessionSlot> factories;...

...212 assertEquals(1, getStereotypes(nodeStatus).get(CAPS).intValue());213 // Craft a status that makes it look like the node is busy, and post it on the bus.214 NodeStatus status = node.getStatus();215 NodeStatus crafted = new NodeStatus(216 status.getId(),217 status.getUri(),218 status.getMaxSessionCount(),219 ImmutableSet.of(220 new Slot(221 new SlotId(status.getId(), UUID.randomUUID()),222 CAPS,223,224 Optional.of(new Session(225 new SessionId(UUID.randomUUID()), sessionUri, CAPS, CAPS,,226 UP,227 Duration.ofSeconds(10),228 status.getVersion(),229 status.getOsInfo());230 NodeStatusEvent(crafted));231 // We claimed the only slot is filled. Life is good.232 wait.until(obj -> !distributor.getStatus().hasCapacity());233 }234 private Map<Capabilities, Integer> getStereotypes(NodeStatus status) {235 Map<Capabilities, Integer> stereotypes = new HashMap<>();236 for (Slot slot : status.getSlots()) {237 int count = stereotypes.getOrDefault(slot.getStereotype(), 0);238 count++;239 stereotypes.put(slot.getStereotype(), count);240 }241 return ImmutableMap.copyOf(stereotypes);242 }243 static class CustomNode extends Node {244 private final EventBus bus;245 private final Function<Capabilities, Session> factory;246 private Session running;247 protected CustomNode(248 EventBus bus,249 NodeId nodeId,250 URI uri,251 Function<Capabilities, Session> factory) {252 super(DefaultTestTracer.createTracer(), nodeId, uri, registrationSecret);253 this.bus = bus;254 this.factory = Objects.requireNonNull(factory);255 }256 @Override257 public boolean isReady() {258 return true;259 }260 @Override261 public Either<WebDriverException, CreateSessionResponse> newSession(CreateSessionRequest sessionRequest) {262 Objects.requireNonNull(sessionRequest);263 if (running != null) {264 return Either.left(new SessionNotCreatedException("Session already exists"));265 }266 Session session = factory.apply(sessionRequest.getDesiredCapabilities());267 running = session;268 return Either.right(269 new CreateSessionResponse(270 session,271 CapabilityResponseEncoder.getEncoder(W3C).apply(session)));272 }273 @Override274 public HttpResponse executeWebDriverCommand(HttpRequest req) {275 throw new UnsupportedOperationException("executeWebDriverCommand");276 }277 @Override278 public HttpResponse uploadFile(HttpRequest req, SessionId id) {279 throw new UnsupportedOperationException("uploadFile");280 }281 @Override282 public Session getSession(SessionId id) throws NoSuchSessionException {283 if (running == null || !running.getId().equals(id)) {284 throw new NoSuchSessionException();285 }286 return running;287 }288 @Override289 public void stop(SessionId id) throws NoSuchSessionException {290 getSession(id);291 running = null;292 SessionClosedEvent(id));293 }294 @Override295 public boolean isSessionOwner(SessionId id) {296 return running != null && running.getId().equals(id);297 }298 @Override299 public boolean isSupporting(Capabilities capabilities) {300 return Objects.equals("cake", capabilities.getCapability("cheese"));301 }302 @Override303 public NodeStatus getStatus() {304 Session sess = null;305 if (running != null) {306 try {307 sess = new Session(308 running.getId(),309 new URI("http://localhost:14568"),310 CAPS,311 running.getCapabilities(),312;313 } catch (URISyntaxException e) {314 throw new RuntimeException(e);315 }316 }317 return new NodeStatus(318 getId(),319 getUri(),320 1,321 ImmutableSet.of(322 new Slot(323 new SlotId(getId(), UUID.randomUUID()),324 CAPS,325,326 Optional.ofNullable(sess))),327 UP,328 Duration.ofSeconds(10),329 getNodeVersion(),330 getOsInfo());331 }332 @Override333 public HealthCheck getHealthCheck() {334 return () -> new HealthCheck.Result(UP, "tl;dr");335 }336 @Override337 public void drain() {...

...154"Encoded response: " + JSON.toJson(ImmutableMap.of(155 "value", ImmutableMap.of(156 "sessionId", sessionId,157 "capabilities", capabilities))));158 NodeDrainStarted(getId()));159 return Optional.of(160 new CreateSessionResponse(161 getSession(sessionId),162 JSON.toJson(ImmutableMap.of(163 "value", ImmutableMap.of(164 "sessionId", sessionId,165 "capabilities", capabilities))).getBytes(UTF_8)));166 }167 private HttpClient extractHttpClient(RemoteWebDriver driver) {168 CommandExecutor executor = driver.getCommandExecutor();169 try {170 Field client = null;171 Class<?> current = executor.getClass();172 while (client == null && (current != null || Object.class.equals(current))) {173 client = findClientField(current);174 current = current.getSuperclass();175 }176 if (client == null) {177 throw new IllegalStateException("Unable to find client field in " + executor.getClass());178 }179 if (!HttpClient.class.isAssignableFrom(client.getType())) {180 throw new IllegalStateException("Client field is not assignable to http client");181 }182 client.setAccessible(true);183 return (HttpClient) client.get(executor);184 } catch (ReflectiveOperationException e) {185 throw new IllegalStateException(e);186 }187 }188 private Field findClientField(Class<?> clazz) {189 try {190 return clazz.getDeclaredField("client");191 } catch (NoSuchFieldException e) {192 return null;193 }194 }195 private Capabilities rewriteCapabilities(RemoteWebDriver driver) {196 // Rewrite the se:options if necessary197 Object rawSeleniumOptions = driver.getCapabilities().getCapability("se:options");198 if (rawSeleniumOptions == null || rawSeleniumOptions instanceof Map) {199 @SuppressWarnings("unchecked") Map<String, Object> original = (Map<String, Object>) rawSeleniumOptions;200 Map<String, Object> updated = new TreeMap<>(original == null ? new HashMap<>() : original);201 String cdpPath = String.format("/session/%s/se/cdp", driver.getSessionId());202 updated.put("cdp", rewrite(cdpPath));203 return new PersistentCapabilities(driver.getCapabilities()).setCapability("se:options", updated);204 }205 return ImmutableCapabilities.copyOf(driver.getCapabilities());206 }207 private URI rewrite(String path) {208 try {209 return new URI(210 gridUri.getScheme(),211 gridUri.getUserInfo(),212 gridUri.getHost(),213 gridUri.getPort(),214 path,215 null,216 null);217 } catch (URISyntaxException e) {218 throw new RuntimeException(e);219 }220 }221 @Override222 public HttpResponse executeWebDriverCommand(HttpRequest req) {223"Executing " + req);224 HttpResponse res = client.execute(req);225 if (DELETE.equals(req.getMethod()) && req.getUri().equals("/session/" + sessionId)) {226 // Ensure the response is sent before we viciously kill the node227 new Thread(228 () -> {229 try {230 Thread.sleep(500);231 } catch (InterruptedException e) {232 Thread.currentThread().interrupt();233 throw new RuntimeException(e);234 }235"Stopping session: " + sessionId);236 stop(sessionId);237 },238 "Node clean up: " + getId())239 .start();240 }241 return res;242 }243 @Override244 public Session getSession(SessionId id) throws NoSuchSessionException {245 if (!isSessionOwner(id)) {246 throw new NoSuchSessionException("Unable to find session with id: " + id);247 }248 return new Session(249 sessionId,250 getUri(),251 stereotype,252 capabilities,253 sessionStart); }254 @Override255 public HttpResponse uploadFile(HttpRequest req, SessionId id) {256 return null;257 }258 @Override259 public void stop(SessionId id) throws NoSuchSessionException {260"Stop has been called: " + id);261 Require.nonNull("Session ID", id);262 if (!isSessionOwner(id)) {263 throw new NoSuchSessionException("Unable to find session " + id);264 }265"Quitting session " + id);266 try {267 driver.quit();268 } catch (Exception e) {269 // It's possible that the driver has already quit.270 }271 SessionClosedEvent(id));272"Firing node drain complete message");273 NodeDrainComplete(getId()));274 }275 @Override276 public boolean isSessionOwner(SessionId id) {277 return driver != null && sessionId.equals(id);278 }279 @Override280 public boolean isSupporting(Capabilities capabilities) {281 return driverInfo.isSupporting(capabilities);282 }283 @Override284 public NodeStatus getStatus() {285 return new NodeStatus(286 getId(),287 getUri(),288 1,289 ImmutableSet.of(290 new Slot(291 new SlotId(getId(), slotId),292 stereotype,293 Instant.EPOCH,294 driver == null ?295 Optional.empty() :296 Optional.of(new Session(sessionId, getUri(), stereotype, capabilities,,297 isDraining() ? DRAINING : UP);298 }299 @Override300 public void drain() {301 NodeDrainStarted(getId()));302 draining = true;303 }304 @Override305 public HealthCheck getHealthCheck() {306 return () -> new HealthCheck.Result(isDraining() ? DRAINING : UP, "Everything is fine");307 }308 @Override309 public boolean isReady() {310 return events.isReady();311 }312}...

...70 Iterator<NodeStatus> iterator = nodes.iterator();71 while (iterator.hasNext()) {72 NodeStatus next =;73 // If the ID is the same, we're re-adding a node. If the URI is the same a node probably restarted74 if (next.getId().equals(node.getId()) || next.getUri().equals(node.getUri())) {75"Re-adding node with id %s and URI %s.", node.getId(), node.getUri()));76 iterator.remove();77 }78 }79 }80 // Nodes are initially added in the "down" state until something changes their availability81 nodes(DOWN).add(node);82 } finally {83 writeLock.unlock();84 }85 return this;86 }87 public GridModel refresh(NodeStatus status) {88 Require.nonNull("Node status", status);89 Lock writeLock = lock.writeLock();90 writeLock.lock();91 try {92 AvailabilityAndNode availabilityAndNode = findNode(status.getId());93 if (availabilityAndNode == null) {94 return this;95 }96 // if the node was marked as "down", keep it down until a healthcheck passes:97 // just because the node can hit the event bus doesn't mean it's reachable98 if (DOWN.equals(availabilityAndNode.availability)) {99 nodes(DOWN).remove(availabilityAndNode.status);100 nodes(DOWN).add(status);101 return this;102 }103 // But do trust the node if it tells us it's draining104 nodes(availabilityAndNode.availability).remove(availabilityAndNode.status);105 nodes(status.getAvailability()).add(status);106 return this;107 } finally {108 writeLock.unlock();109 }110 }111 public GridModel remove(NodeId id) {112 Require.nonNull("Node ID", id);113 Lock writeLock = lock.writeLock();114 writeLock.lock();115 try {116 AvailabilityAndNode availabilityAndNode = findNode(id);117 if (availabilityAndNode == null) {118 return this;119 }120 nodes(availabilityAndNode.availability).remove(availabilityAndNode.status);121 return this;122 } finally {123 writeLock.unlock();124 }125 }126 public Availability setAvailability(NodeId id, Availability availability) {127 Require.nonNull("Node ID", id);128 Require.nonNull("Availability", availability);129 Lock writeLock = lock.writeLock();130 writeLock.lock();131 try {132 AvailabilityAndNode availabilityAndNode = findNode(id);133 if (availabilityAndNode == null) {134 return DOWN;135 }136 if (availability.equals(availabilityAndNode.availability)) {137 return availability;138 }139 nodes(availabilityAndNode.availability).remove(availabilityAndNode.status);140 nodes(availability).add(availabilityAndNode.status);141 "Switching node %s (uri: %s) from %s to %s",143 id,144 availabilityAndNode.status.getUri(),145 availabilityAndNode.availability,146 availability));147 return availabilityAndNode.availability;148 } finally {149 writeLock.unlock();150 }151 }152 public boolean reserve(SlotId slotId) {153 Lock writeLock = lock.writeLock();154 writeLock.lock();155 try {156 AvailabilityAndNode node = findNode(slotId.getOwningNodeId());157 if (node == null) {158 LOG.warning(String.format("Asked to reserve slot on node %s, but unable to find node", slotId.getOwningNodeId()));159 return false;160 }161 if (!UP.equals(node.availability)) {162 LOG.warning(String.format(163 "Asked to reserve a slot on node %s, but not is %s",164 slotId.getOwningNodeId(),165 node.availability));166 return false;167 }168 Optional<Slot> maybeSlot = node.status.getSlots().stream()169 .filter(slot -> slotId.equals(slot.getId()))170 .findFirst();171 if (!maybeSlot.isPresent()) {172 LOG.warning(String.format(173 "Asked to reserve slot on node %s, but no slot with id %s found",174 node.status.getId(),175 slotId));176 return false;177 }178 reserve(node.status, maybeSlot.get());179 return true;180 } finally {181 writeLock.unlock();182 }183 }184 public Set<NodeStatus> getSnapshot() {185 Lock readLock = this.lock.readLock();186 readLock.lock();187 try {188 ImmutableSet.Builder<NodeStatus> snapshot = ImmutableSet.builder();189 for (Map.Entry<Availability, Set<NodeStatus>> entry : nodes.entrySet()) {190 entry.getValue().stream()191 .map(status -> rewrite(status, entry.getKey()))192 .forEach(snapshot::add);193 }194 return;195 } finally {196 readLock.unlock();197 }198 }199 private Set<NodeStatus> nodes(Availability availability) {200 return nodes.computeIfAbsent(availability, ignored -> new HashSet<>());201 }202 private AvailabilityAndNode findNode(NodeId id) {203 for (Map.Entry<Availability, Set<NodeStatus>> entry : nodes.entrySet()) {204 for (NodeStatus nodeStatus : entry.getValue()) {205 if (id.equals(nodeStatus.getId())) {206 return new AvailabilityAndNode(entry.getKey(), nodeStatus);207 }208 }209 }210 return null;211 }212 private NodeStatus rewrite(NodeStatus status, Availability availability) {213 return new NodeStatus(214 status.getId(),215 status.getUri(),216 status.getMaxSessionCount(),217 status.getSlots(),218 availability);219 }220 private void release(SessionId id) {221 if (id == null) {222 return;223 }224 Lock writeLock = lock.writeLock();225 writeLock.lock();226 try {227 for (Map.Entry<Availability, Set<NodeStatus>> entry : nodes.entrySet()) {228 for (NodeStatus node : entry.getValue()) {229 for (Slot slot : node.getSlots()) {230 if (!slot.getSession().isPresent()) {231 continue;232 }233 if (id.equals(slot.getSession().get().getId())) {234 Slot released = new Slot(235 slot.getId(),236 slot.getStereotype(),237 slot.getLastStarted(),238 Optional.empty());239 amend(entry.getKey(), node, released);240 return;241 }242 }243 }244 }245 } finally {246 writeLock.unlock();247 }248 }249 private void reserve(NodeStatus status, Slot slot) {250 Instant now =;251 Slot reserved = new Slot(252 slot.getId(),253 slot.getStereotype(),254 now,255 Optional.of(new Session(256 RESERVED,257 status.getUri(),258 slot.getStereotype(),259 slot.getStereotype(),260 now)));261 amend(UP, status, reserved);262 }263 public void setSession(SlotId slotId, Session session) {264 Require.nonNull("Slot ID", slotId);265 AvailabilityAndNode node = findNode(slotId.getOwningNodeId());266 if (node == null) {267 LOG.warning("Grid model and reality have diverged. Unable to find node " + slotId.getOwningNodeId());268 return;269 }270 Optional<Slot> maybeSlot = node.status.getSlots().stream()271 .filter(slot -> slotId.equals(slot.getId()))272 .findFirst();273 if (!maybeSlot.isPresent()) {274 LOG.warning("Grid model and reality have diverged. Unable to find slot " + slotId);275 return;276 }277 Slot slot = maybeSlot.get();278 Optional<Session> maybeSession = slot.getSession();279 if (!maybeSession.isPresent()) {280 LOG.warning("Grid model and reality have diverged. Slot is not reserved. " + slotId);281 return;282 }283 Session current = maybeSession.get();284 if (!RESERVED.equals(current.getId())) {285 LOG.warning("Gid model and reality have diverged. Slot has session and is not reserved. " + slotId);286 return;287 }288 Slot updated = new Slot(289 slot.getId(),290 slot.getStereotype(),291 session == null ? slot.getLastStarted() : session.getStartTime(),292 Optional.ofNullable(session));293 amend(node.availability, node.status, updated);294 }295 private void amend(Availability availability, NodeStatus status, Slot slot) {296 Set<Slot> newSlots = new HashSet<>(status.getSlots());297 newSlots.removeIf(s -> s.getId().equals(slot.getId()));298 newSlots.add(slot);299 nodes(availability).remove(status);300 nodes(availability).add(new NodeStatus(301 status.getId(),302 status.getUri(),303 status.getMaxSessionCount(),304 newSlots,305 status.getAvailability()));306 }307 private static class AvailabilityAndNode {308 public final Availability availability;309 public final NodeStatus status;310 public AvailabilityAndNode(Availability availability, NodeStatus status) {311 this.availability = availability;312 this.status = status;313 }314 }315}...

...118 Require.nonNull("Node", status);119 Lock writeLock = lock.writeLock();120 writeLock.lock();121 try {122 if (nodes.containsKey(status.getId())) {123 return;124 }125 Set<Capabilities> capabilities = status.getSlots().stream()126 .map(Slot::getStereotype)127 .map(ImmutableCapabilities::copyOf)128 .collect(toImmutableSet());129 // A new node! Add this as a remote node, since we've not called add130 RemoteNode remoteNode = new RemoteNode(131 tracer,132 clientFactory,133 status.getId(),134 status.getUri(),135 registrationSecret,136 capabilities);137 add(remoteNode);138 } finally {139 writeLock.unlock();140 }141 }142 @Override143 public LocalDistributor add(Node node) {144 Require.nonNull("Node", node);145"Added node %s at %s.", node.getId(), node.getUri()));146 nodes.put(node.getId(), node);147 model.add(node.getStatus());148 // Extract the health check149 Runnable runnableHealthCheck = asRunnableHealthCheck(node);150 allChecks.put(node.getId(), runnableHealthCheck);151 hostChecker.submit(runnableHealthCheck, Duration.ofMinutes(5), Duration.ofSeconds(30));152 NodeAddedEvent(node.getId()));153 return this;154 }155 private Runnable asRunnableHealthCheck(Node node) {156 HealthCheck healthCheck = node.getHealthCheck();157 NodeId id = node.getId();158 return () -> {159 HealthCheck.Result result;160 try {161 result = healthCheck.check();162 } catch (Exception e) {163 LOG.log(Level.WARNING, "Unable to process node " + id, e);164 result = new HealthCheck.Result(DOWN, "Unable to run healthcheck. Assuming down");165 }166 Lock writeLock = lock.writeLock();167 writeLock.lock();168 try {169 model.setAvailability(id, result.getAvailability());170 } finally {171 writeLock.unlock();...

...60 private Set<Slot> slots;61 private int maxSessionCount;62 public Host(EventBus bus, Node node) {63 this.node = Require.nonNull("Node", node);64 this.nodeId = node.getId();65 this.uri = node.getUri();66 this.status = DOWN;67 this.slots = ImmutableSet.of();68 HealthCheck healthCheck = node.getHealthCheck();69 this.performHealthCheck = () -> {70 HealthCheck.Result result = healthCheck.check();71 Availability current = result.isAlive() ? UP : DOWN;72 Availability previous = setHostStatus(current);73 //If the node has been set to maintenance mode, set the status here as draining74 if (node.isDraining() || previous == DRAINING) {75 // We want to continue to allow the node to drain.76 setHostStatus(DRAINING);77 return;78 }79 if (current != previous) {80 "Changing status of node %s from %s to %s. Reason: %s",82 node.getId(),83 previous,84 current,85 result.getMessage()));86 }87 };88 bus.addListener(SESSION_CLOSED, event -> {89 SessionId id = event.getData(SessionId.class);90 this.slots.forEach(slot -> slot.onEnd(id));91 });92 update(node.getStatus());93 }94 public void update(NodeStatus status) {95 Require.nonNull("Node status", status);96 Lock writeLock = lock.writeLock();97 writeLock.lock();98 try {99 this.slots = status.getSlots().stream()100 .map(slot -> new Slot(node, slot.getStereotype(), slot.getSession().isPresent() ? ACTIVE : AVAILABLE))101 .collect(toImmutableSet());102 // By definition, we can never have more sessions than we have slots available103 this.maxSessionCount = Math.min(this.slots.size(), status.getMaxSessionCount());104 } finally {105 writeLock.unlock();106 }107 }108 public NodeId getId() {109 return nodeId;110 }111 public DistributorStatus.NodeSummary asSummary() {112 Map<Capabilities, Integer> stereotypes = new HashMap<>();113 Map<Capabilities, Integer> used = new HashMap<>();114 Set<Session> activeSessions = new HashSet<>();115 slots.forEach(slot -> {116 stereotypes.compute(slot.getStereotype(), (key, curr) -> curr == null ? 1 : curr + 1);117 if (slot.getStatus() != AVAILABLE) {118 used.compute(slot.getStereotype(), (key, curr) -> curr == null ? 1 : curr + 1);119 activeSessions.add(slot.getCurrentSession());120 }121 });122 return new DistributorStatus.NodeSummary(...

...79 status.getOsInfo().get("arch"),80 status.getOsInfo().get("name"),81 status.getOsInfo().get("version"));82 toReturn.add(new Node(83 status.getId(),84 status.getUri(),85 status.getAvailability(),86 status.getMaxSessionCount(),87 status.getSlots().size(),88 stereotypes,89 sessions,90 status.getVersion(),91 osInfo));92 }93 return;94 }95 public int getNodeCount() {96 return distributorStatus.get().getNodes().size();97 }98 public int getSessionCount() {99 return distributorStatus.get().getNodes().stream()100 .map(NodeStatus::getSlots)101 .flatMap(Collection::stream)102 .filter(slot -> slot.getSession().isPresent())103 .mapToInt(slot -> 1)104 .sum();105 }106 public int getTotalSlots() {107 return distributorStatus.get().getNodes().stream()108 .mapToInt(status -> status.getSlots().size())109 .sum();110 }111 public int getMaxSession() {112 return distributorStatus.get().getNodes().stream()113 .mapToInt(NodeStatus::getMaxSessionCount)114 .sum();115 }116 public int getSessionQueueSize() {117 return queueInfoList.size();118 }119 public List<String> getSessionQueueRequests() {120 // TODO: The Grid UI expects there to be a single capability per new session request, which is not correct121 return .map(set -> set.isEmpty() ? new ImmutableCapabilities() : set.iterator().next())123 .map(JSON::toJson)124 .collect(Collectors.toList());125 }126 public List<Session> getSessions() {127 List<Session> sessions = new ArrayList<>();128 for (NodeStatus status : distributorStatus.get().getNodes()) {129 for (Slot slot : status.getSlots()) {130 if (slot.getSession().isPresent()) {131 session = slot.getSession().get();132 sessions.add(133 new org.openqa.selenium.grid.graphql.Session(134 session.getId().toString(),135 session.getCapabilities(),136 session.getStartTime(),137 session.getUri(),138 status.getId().toString(),139 status.getUri(),140 slot)141 );142 }143 }144 }145 return sessions;146 }147}...

...41 SessionInSlot currentSession = findSession(sessionId, nodeStatuses);42 if (currentSession != null) {43 session = currentSession.session;44 return new org.openqa.selenium.grid.graphql.Session(45 session.getId().toString(),46 session.getCapabilities(),47 session.getStartTime(),48 session.getUri(),49 currentSession.node.getId().toString(),50 currentSession.node.getUri(),51 currentSession.slot);52 } else {53 throw new SessionNotFoundException("No ongoing session found with the requested session id.",54 sessionId);55 }56 }57 private SessionInSlot findSession(String sessionId, Set<NodeStatus> nodeStatuses) {58 for (NodeStatus status : nodeStatuses) {59 for (Slot slot : status.getSlots()) {60 Optional<> session = slot.getSession();61 if (session.isPresent() && sessionId.equals(session.get().getId().toString())) {62 return new SessionInSlot(session.get(), status, slot);63 }64 }65 }66 return null;67 }68 private static class SessionInSlot {69 private final session;70 private final NodeStatus node;71 private final Slot slot;72 SessionInSlot( session, NodeStatus node, Slot slot) {73 this.session = session;74 this.node = node;75 this.slot = slot;...

1import;2Slot slot = new Slot();3String slotId = slot.getId();4import;5Slot slot = new Slot();6String slotUri = slot.getUri();7import;8Slot slot = new Slot();9String slotUri = slot.getUri();10import;11Slot slot = new Slot();12String slotUri = slot.getUri();13import;14Slot slot = new Slot();15String slotUri = slot.getUri();16import;17Slot slot = new Slot();18String slotUri = slot.getUri();19import;20Slot slot = new Slot();21String slotUri = slot.getUri();22import;23Slot slot = new Slot();24String slotUri = slot.getUri();25import;26Slot slot = new Slot();27String slotUri = slot.getUri();28import;29Slot slot = new Slot();30String slotUri = slot.getUri();31import;32Slot slot = new Slot();33String slotUri = slot.getUri();

1String slotId = slot.getId();2System.out.println(slotId);3String slotUri = slot.getUri();4System.out.println(slotUri);5Capabilities slotCapabilities = slot.getCapabilities();6System.out.println(slotCapabilities);7Instant slotStartTime = slot.getStartTime();8System.out.println(slotStartTime);9Instant slotEndTime = slot.getEndTime();10System.out.println(slotEndTime);11Duration slotInactivityTimeout = slot.getInactivityTimeout();12System.out.println(slotInactivityTimeout);13Instant slotCreationTime = slot.getCreationTime();14System.out.println(slotCreationTime);15Map<String, String> slotMetadata = slot.getMetadata();16System.out.println(slotMetadata);17Distributor slotDistributor = slot.getDistributor();18System.out.println(slotDistributor);19String slotDistributorId = slot.getDistributorId();20System.out.println(slotDistributorId);21String slotDistributorUri = slot.getDistributorUri();22System.out.println(slotDistributorUri);23Map<String, String> slotDistributorMetadata = slot.getDistributorMetadata();24System.out.println(slotDistributorMetadata);

1import;2import;3SlotId slotId = new SlotId("slotId");4Slot slot = new Slot(slotId, "slotUri", "slotStatus");5slotId = slot.getId();6System.out.println("Slot id is: " + slotId);

