...19import org.openqa.selenium.events.EventBus;20import org.openqa.selenium.grid.data.Availability;21import org.openqa.selenium.grid.data.NodeDrainComplete;22import org.openqa.selenium.grid.data.NodeDrainStarted;23import org.openqa.selenium.grid.data.NodeId;24import org.openqa.selenium.grid.data.NodeRemovedEvent;25import org.openqa.selenium.grid.data.NodeStatus;26import org.openqa.selenium.grid.data.NodeStatusEvent;27import org.openqa.selenium.grid.data.Session;28import org.openqa.selenium.grid.data.SessionClosedEvent;29import org.openqa.selenium.grid.data.Slot;30import org.openqa.selenium.grid.data.SlotId;31import org.openqa.selenium.grid.security.Secret;32import org.openqa.selenium.internal.Require;33import org.openqa.selenium.remote.SessionId;34import java.time.Instant;35import java.util.HashSet;36import java.util.Iterator;37import java.util.Map;38import java.util.Optional;39import java.util.Set;40import java.util.concurrent.ConcurrentHashMap;41import java.util.concurrent.locks.Lock;42import java.util.concurrent.locks.ReadWriteLock;43import java.util.concurrent.locks.ReentrantReadWriteLock;44import java.util.logging.Logger;45import static org.openqa.selenium.grid.data.Availability.DOWN;46import static org.openqa.selenium.grid.data.Availability.DRAINING;47import static org.openqa.selenium.grid.data.Availability.UP;48public class GridModel {49 private static final Logger LOG = Logger.getLogger(GridModel.class.getName());50 private static final SessionId RESERVED = new SessionId("reserved");51 private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);52 private final Map<Availability, Set<NodeStatus>> nodes = new ConcurrentHashMap<>();53 private final EventBus events;54 public GridModel(EventBus events, Secret registrationSecret) {55 this.events = Require.nonNull("Event bus", events);56 Require.nonNull("Registration secret", registrationSecret);57 events.addListener(NodeDrainStarted.listener(nodeId -> setAvailability(nodeId, DRAINING)));58 events.addListener(NodeDrainComplete.listener(this::remove));59 events.addListener(NodeRemovedEvent.listener(this::remove));60 events.addListener(NodeStatusEvent.listener(status -> refresh(status)));61 events.addListener(SessionClosedEvent.listener(this::release));62 }63 public GridModel add(NodeStatus node) {64 Require.nonNull("Node", node);65 Lock writeLock = lock.writeLock();66 writeLock.lock();67 try {68 // If we've already added the node, remove it.69 for (Set<NodeStatus> nodes : nodes.values()) {70 Iterator<NodeStatus> iterator = nodes.iterator();71 while (iterator.hasNext()) {72 NodeStatus next = iterator.next();73 // If the ID is the same, we're re-adding a node. If the URI is the same a node probably restarted74 if (next.getId().equals(node.getId()) || next.getUri().equals(node.getUri())) {75 LOG.info(String.format("Re-adding node with id %s and URI %s.", node.getId(), node.getUri()));76 iterator.remove();77 }78 }79 }80 // Nodes are initially added in the "down" state until something changes their availability81 nodes(DOWN).add(node);82 } finally {83 writeLock.unlock();84 }85 return this;86 }87 public GridModel refresh(NodeStatus status) {88 Require.nonNull("Node status", status);89 Lock writeLock = lock.writeLock();90 writeLock.lock();91 try {92 AvailabilityAndNode availabilityAndNode = findNode(status.getId());93 if (availabilityAndNode == null) {94 return this;95 }96 // if the node was marked as "down", keep it down until a healthcheck passes:97 // just because the node can hit the event bus doesn't mean it's reachable98 if (DOWN.equals(availabilityAndNode.availability)) {99 nodes(DOWN).remove(availabilityAndNode.status);100 nodes(DOWN).add(status);101 return this;102 }103 // But do trust the node if it tells us it's draining104 nodes(availabilityAndNode.availability).remove(availabilityAndNode.status);105 nodes(status.getAvailability()).add(status);106 return this;107 } finally {108 writeLock.unlock();109 }110 }111 public GridModel remove(NodeId id) {112 Require.nonNull("Node ID", id);113 Lock writeLock = lock.writeLock();114 writeLock.lock();115 try {116 AvailabilityAndNode availabilityAndNode = findNode(id);117 if (availabilityAndNode == null) {118 return this;119 }120 nodes(availabilityAndNode.availability).remove(availabilityAndNode.status);121 return this;122 } finally {123 writeLock.unlock();124 }125 }126 public Availability setAvailability(NodeId id, Availability availability) {127 Require.nonNull("Node ID", id);128 Require.nonNull("Availability", availability);129 Lock writeLock = lock.writeLock();130 writeLock.lock();131 try {132 AvailabilityAndNode availabilityAndNode = findNode(id);133 if (availabilityAndNode == null) {134 return DOWN;135 }136 if (availability.equals(availabilityAndNode.availability)) {137 return availability;138 }139 nodes(availabilityAndNode.availability).remove(availabilityAndNode.status);140 nodes(availability).add(availabilityAndNode.status);141 LOG.info(String.format(142 "Switching node %s (uri: %s) from %s to %s",143 id,144 availabilityAndNode.status.getUri(),145 availabilityAndNode.availability,146 availability));147 return availabilityAndNode.availability;148 } finally {149 writeLock.unlock();150 }151 }152 public boolean reserve(SlotId slotId) {153 Lock writeLock = lock.writeLock();154 writeLock.lock();155 try {156 AvailabilityAndNode node = findNode(slotId.getOwningNodeId());157 if (node == null) {158 LOG.warning(String.format("Asked to reserve slot on node %s, but unable to find node", slotId.getOwningNodeId()));159 return false;160 }161 if (!UP.equals(node.availability)) {162 LOG.warning(String.format(163 "Asked to reserve a slot on node %s, but not is %s",164 slotId.getOwningNodeId(),165 node.availability));166 return false;167 }168 Optional<Slot> maybeSlot = node.status.getSlots().stream()169 .filter(slot -> slotId.equals(slot.getId()))170 .findFirst();171 if (!maybeSlot.isPresent()) {172 LOG.warning(String.format(173 "Asked to reserve slot on node %s, but no slot with id %s found",174 node.status.getId(),175 slotId));176 return false;177 }178 reserve(node.status, maybeSlot.get());179 return true;180 } finally {181 writeLock.unlock();182 }183 }184 public Set<NodeStatus> getSnapshot() {185 Lock readLock = this.lock.readLock();186 readLock.lock();187 try {188 ImmutableSet.Builder<NodeStatus> snapshot = ImmutableSet.builder();189 for (Map.Entry<Availability, Set<NodeStatus>> entry : nodes.entrySet()) {190 entry.getValue().stream()191 .map(status -> rewrite(status, entry.getKey()))192 .forEach(snapshot::add);193 }194 return snapshot.build();195 } finally {196 readLock.unlock();197 }198 }199 private Set<NodeStatus> nodes(Availability availability) {200 return nodes.computeIfAbsent(availability, ignored -> new HashSet<>());201 }202 private AvailabilityAndNode findNode(NodeId id) {203 for (Map.Entry<Availability, Set<NodeStatus>> entry : nodes.entrySet()) {204 for (NodeStatus nodeStatus : entry.getValue()) {205 if (id.equals(nodeStatus.getId())) {206 return new AvailabilityAndNode(entry.getKey(), nodeStatus);207 }208 }209 }210 return null;211 }212 private NodeStatus rewrite(NodeStatus status, Availability availability) {213 return new NodeStatus(214 status.getId(),215 status.getUri(),216 status.getMaxSessionCount(),217 status.getSlots(),218 availability);219 }220 private void release(SessionId id) {221 if (id == null) {222 return;223 }224 Lock writeLock = lock.writeLock();225 writeLock.lock();226 try {227 for (Map.Entry<Availability, Set<NodeStatus>> entry : nodes.entrySet()) {228 for (NodeStatus node : entry.getValue()) {229 for (Slot slot : node.getSlots()) {230 if (!slot.getSession().isPresent()) {231 continue;232 }233 if (id.equals(slot.getSession().get().getId())) {234 Slot released = new Slot(235 slot.getId(),236 slot.getStereotype(),237 slot.getLastStarted(),238 Optional.empty());239 amend(entry.getKey(), node, released);240 return;241 }242 }243 }244 }245 } finally {246 writeLock.unlock();247 }248 }249 private void reserve(NodeStatus status, Slot slot) {250 Instant now = Instant.now();251 Slot reserved = new Slot(252 slot.getId(),253 slot.getStereotype(),254 now,255 Optional.of(new Session(256 RESERVED,257 status.getUri(),258 slot.getStereotype(),259 slot.getStereotype(),260 now)));261 amend(UP, status, reserved);262 }263 public void setSession(SlotId slotId, Session session) {264 Require.nonNull("Slot ID", slotId);265 AvailabilityAndNode node = findNode(slotId.getOwningNodeId());266 if (node == null) {267 LOG.warning("Grid model and reality have diverged. Unable to find node " + slotId.getOwningNodeId());268 return;269 }270 Optional<Slot> maybeSlot = node.status.getSlots().stream()271 .filter(slot -> slotId.equals(slot.getId()))272 .findFirst();273 if (!maybeSlot.isPresent()) {274 LOG.warning("Grid model and reality have diverged. Unable to find slot " + slotId);275 return;276 }277 Slot slot = maybeSlot.get();278 Optional<Session> maybeSession = slot.getSession();279 if (!maybeSession.isPresent()) {280 LOG.warning("Grid model and reality have diverged. Slot is not reserved. " + slotId);281 return;...