// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); }
/** * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs * them on the executor service. Must not be called with synchronization because executing calls * can call into user code. * * @return true if the dispatcher is currently running calls. */ privatebooleanpromoteAndExecute(){ assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); //如果正在运行的Call的数量大于64 if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) thrownew AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it. if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); }
// If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.exchange != null && calls > 1) { thrownew IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); }
// Call the next interceptor in the chain. //创建下一个chain 并传递给拦截器 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); //调用拦截器的intercept方法 在interceptor中会调用next的proceed执行下一个拦截器 Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed(). if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) { thrownew IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); }
// Confirm that the intercepted response isn't null. if (response == null) { thrownew NullPointerException("interceptor " + interceptor + " returned null"); }
if (response.body() == null) { thrownew IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); }
return response; }
建立连接
Transmitter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
publicvoidprepareToConnect(Request request){ if (this.request != null) { if (sameConnection(this.request.url(), request.url()) && exchangeFinder.hasRouteToTry()) { return; // Already ready. } if (exchange != null) thrownew IllegalStateException();
/** Returns a new exchange to carry a new request and response. */ //Transmitter Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks){ synchronized (connectionPool) { if (noMoreExchanges) { thrownew IllegalStateException("released"); } if (exchange != null) { thrownew IllegalStateException("cannot make a new request because the previous response " + "is still open: please call response.close()"); } } //获取ExchangeCodec ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks); Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled)throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) thrownew IOException("Canceled"); hasStreamFailure = false; // This is a fresh attempt.
// Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new exchanges. releasedConnection = transmitter.connection; toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null;
if (transmitter.connection != null) { // We had an already-allocated connection and it's good. result = transmitter.connection; releasedConnection = null; }
if (result == null) { // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } elseif (nextRouteToTry != null) { selectedRoute = nextRouteToTry; nextRouteToTry = null; } elseif (retryCurrentRoute()) { selectedRoute = transmitter.connection.route(); } } } closeQuietly(toClose);
if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; }
// If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); }
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } }
if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); }
// Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; } }
// If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; }
// Do TCP + TLS handshakes. This is a blocking operation. //RealConnection 连接socket result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route());
Socket socket = null; synchronized (connectionPool) { connectingConnection = null; // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection;
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In // that case we will retry the route we just successfully connected with. nextRouteToTry = selectedRoute; } else { connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); } } closeQuietly(socket);
/** This is the last interceptor in the chain. It makes a network call to the server. */ publicfinalclassCallServerInterceptorimplementsInterceptor{ privatefinalboolean forWebSocket;
long sentRequestMillis = System.currentTimeMillis();
exchange.writeRequestHeaders(request); //写入请求头
boolean responseHeadersStarted = false; Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); } //写入请求体 if (responseBuilder == null) { if (request.body().isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest(); BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, true)); request.body().writeTo(bufferedRequestBody); } else { // Write the request body if the "Expect: 100-continue" expectation was met. BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection(); } } } else { exchange.noRequestBody(); }
if (request.body() == null || !request.body().isDuplex()) { exchange.finishRequest(); }
if (!responseHeadersStarted) { exchange.responseHeadersStart(); }
if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false); }
int code = response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build();
code = response.code(); }
exchange.responseHeadersEnd(response);
if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); }
if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); }