Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor connection retry logic for Http sink #151

Merged
merged 1 commit into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state)
}
}

private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList) {
private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList)
throws ConnectionUnavailableException {
//generate encoded base64 auth for getting refresh token
String consumerKeyValue = consumerKey + ":" + consumerSecret;
String encodedAuth = "Basic " + encodeBase64(consumerKeyValue);
Expand All @@ -543,13 +544,13 @@ private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, Lis
} else {
log.error("Error at sending oauth request to API endpoint: " +
publisherURL + "', with response code: " + response + ". Message dropped. Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint: " +
throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint: " +
publisherURL + "', with response code: " + response + ". Message dropped. Message dropped.");
}
}

private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List<Header> headersList,
String encodedAuth) {
String encodedAuth) throws ConnectionUnavailableException {

Boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth);
if (checkFromCache) {
Expand All @@ -560,7 +561,7 @@ private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, L
}

private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOptions, List<Header> headersList,
String encodedAuth) {
String encodedAuth) throws ConnectionUnavailableException {
String accessToken = accessTokenCache.getAccessToken(encodedAuth);
for (Header header : headersList) {
if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) {
Expand All @@ -582,13 +583,13 @@ private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOp
} else {
log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " +
response + ". Message dropped. ");
throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " +
throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " +
publisherURL + "', with response code: " + response + ". Message dropped.");
}
}

private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOptions, List<Header> headersList,
String encodedAuth) {
String encodedAuth) throws ConnectionUnavailableException {
Boolean checkRefreshToken = accessTokenCache.checkRefreshAvailableKey(encodedAuth);
if (checkRefreshToken) {
for (Header header : headersList) {
Expand Down Expand Up @@ -633,7 +634,7 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti
} else {
log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " +
response + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " +
throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " +
publisherURL + "', with response code: " + response + ". Message dropped.");
}

Expand All @@ -648,13 +649,14 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti
} else {
log.error("Failed to generate new access token for the expired access token. Error code: " +
accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired" +
throw new ConnectionUnavailableException("Failed to generate new access token for the expired" +
" access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) +
". Message dropped.");
}
}

private int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, int tryCount) {
private int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList, int tryCount)
throws ConnectionUnavailableException {
if (!publisherURLOption.isStatic()) {
super.initClientConnector(dynamicOptions);
}
Expand Down Expand Up @@ -688,13 +690,13 @@ private int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Head
boolean latchCount = latch.await(30, TimeUnit.SECONDS);
if (!latchCount) {
log.debug("Time out due to getting getting response from " + publisherURL + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Time out due to getting getting response from "
throw new ConnectionUnavailableException("Time out due to getting getting response from "
+ publisherURL + ". Message dropped.");

}
} catch (InterruptedException e) {
log.debug("Failed to get a response from " + publisherURL + "," + e + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Failed to get a response from " +
throw new ConnectionUnavailableException("Failed to get a response from " +
publisherURL + ", " + e + ". Message dropped.");
}
if (isBlockingIO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@
type = {DataType.STRING},
optional = true,
defaultValue = "-"),
@Parameter(
name = "blocking.io",
description = "Blocks the request thread until a response it received from HTTP " +
"endpoint. This should be enabled for reliable messaging (error handling)",
type = {DataType.BOOL},
optional = true,
defaultValue = "false"),
@Parameter(
name = "headers",
description = "HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\n" +
Expand Down Expand Up @@ -475,7 +468,6 @@ public class HttpSink extends Sink {
private long maxWaitTime;
private String hostnameVerificationEnabled;
private String sslVerificationDisabled;
private boolean isBlockingIO;

private DefaultHttpWsConnectorFactory httpConnectorFactory;

Expand Down Expand Up @@ -595,9 +587,6 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
authType = HttpConstants.NO_AUTH;
}

isBlockingIO = Boolean.parseBoolean(
optionHolder.validateAndGetStaticValue(HttpConstants.BLOCKING_IO, HttpConstants.FALSE));

initConnectorFactory();
if (publisherURLOption.isStatic()) {
initClientConnector(null);
Expand Down Expand Up @@ -632,7 +621,8 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state)
}
}

private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList) {
private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList)
throws ConnectionUnavailableException {
//generate encoded base64 auth for getting refresh token
String consumerKeyValue = consumerKey + ":" + consumerSecret;
String encodedAuth = "Basic " + encodeBase64(consumerKeyValue)
Expand All @@ -654,13 +644,13 @@ private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, Lis
} else {
log.error("Error at sending oauth request to API endpoint " +
publisherURL + "', with response code: " + response + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " +
throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " +
publisherURL + "', and response code: " + response + ". Message dropped.");
}
}

private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List<Header> headersList,
String encodedAuth) {
String encodedAuth) throws ConnectionUnavailableException {
boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth);
if (checkFromCache) {
getNewAccessTokenWithCache(payload, dynamicOptions, headersList, encodedAuth);
Expand All @@ -670,7 +660,7 @@ private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, L
}

private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOptions, List<Header> headersList,
String encodedAuth) {
String encodedAuth) throws ConnectionUnavailableException {
String accessToken = accessTokenCache.getAccessToken(encodedAuth);
for (Header header : headersList) {
if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) {
Expand All @@ -692,13 +682,13 @@ private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOp
} else {
log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " +
response + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + publisherURL +
throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + publisherURL +
"', with response code: " + response + ". Message dropped.");
}
}

private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOptions, List<Header> headersList,
String encodedAuth) {
String encodedAuth) throws ConnectionUnavailableException {
Boolean checkRefreshToken = accessTokenCache.checkRefreshAvailableKey(encodedAuth);
if (checkRefreshToken) {
for (Header header : headersList) {
Expand Down Expand Up @@ -744,7 +734,7 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti
} else {
log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " +
response + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " +
throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " +
publisherURL + "', with response code: " + response + ". Message dropped.");
}
} else if (accessTokenCache.getResponseCode(encodedAuth) == HttpConstants.AUTHENTICATION_FAIL_CODE) {
Expand All @@ -758,7 +748,7 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti
} else {
log.error("Failed to generate new access token for the expired access token. Error code: " +
accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired" +
throw new ConnectionUnavailableException("Failed to generate new access token for the expired" +
" access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth)
+ ". Message dropped.");
}
Expand All @@ -782,7 +772,7 @@ void getAccessToken(DynamicOptions dynamicOptions, String encodedAuth, String to
}

void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions,
List<Header> headersList) {
List<Header> headersList) throws ConnectionUnavailableException {
//check the availability of the authorization
String accessToken;
boolean authAvailability = false;
Expand Down Expand Up @@ -823,7 +813,7 @@ void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions,
} else {
log.error("Failed to generate new access token for the expired access token. Error code: " +
accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired" +
throw new ConnectionUnavailableException("Failed to generate new access token for the expired" +
" access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) +
". Message dropped.");
}
Expand All @@ -841,7 +831,8 @@ void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions,
}
}

private int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList) {
private int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Header> headersList)
throws ConnectionUnavailableException {
if (!publisherURLOption.isStatic()) {
initClientConnector(dynamicOptions);
}
Expand Down Expand Up @@ -872,45 +863,21 @@ private int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Head
boolean latchCount = latch.await(30, TimeUnit.SECONDS);
if (!latchCount) {
log.debug("Time out due to getting getting response from " + publisherURL + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Time out due to getting getting response from "
throw new ConnectionUnavailableException("Time out due to getting getting response from "
+ publisherURL + ". Message dropped.");
}
} catch (InterruptedException e) {
log.debug("Failed to get a response from " + publisherURL + "," + e + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Failed to get a response from " +
throw new ConnectionUnavailableException("Failed to get a response from " +
publisherURL + ", " + e + ". Message dropped.");
}
HttpCarbonMessage response = listener.getHttpResponseMessage();
return response.getNettyHttpResponse().status().code();
} else if (!isBlockingIO) {
clientConnector.send(cMessage);
return HttpConstants.SUCCESS_CODE;
} else {
CountDownLatch latch = new CountDownLatch(1);
HttpResponseFuture responseFuture = clientConnector.send(cMessage);
HTTPResponseListener responseListener = new HTTPResponseListener(latch);
HTTPResponseListener responseListener = new HTTPResponseListener(payload, dynamicOptions, this);
responseFuture.setHttpConnectorListener(responseListener);

try {
boolean latchCount = latch.await(30, TimeUnit.SECONDS);
if (!latchCount) {
log.debug("Time out due to getting getting response from " + publisherURL + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Time out due to getting getting response from "
+ publisherURL + ". Message dropped.");

}
} catch (InterruptedException e) {
log.debug("Failed to get a response from " + publisherURL + "," + e + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Failed to get a response from " +
publisherURL + ", " + e + ". Message dropped.");
}

if (responseListener.throwable == null) {
return HttpConstants.SUCCESS_CODE;
} else {
throw new SiddhiAppRuntimeException("Siddhi App " + siddhiAppContext.getName() + " failed to publish " +
"events to HTTP endpoint", responseListener.throwable);
}
return HttpConstants.SUCCESS_CODE;
}
}

Expand Down Expand Up @@ -1154,22 +1121,26 @@ private String encodeBase64(String consumerKeyValue) {
}

static class HTTPResponseListener implements HttpConnectorListener {
Throwable throwable;
CountDownLatch countDownLatch;
Object payload;
DynamicOptions dynamicOptions;
HttpSink httpSink;

HTTPResponseListener(CountDownLatch latch) {
this.countDownLatch = latch;
HTTPResponseListener(Object payload, DynamicOptions dynamicOptions, HttpSink httpSink) {
this.payload = payload;
this.dynamicOptions = dynamicOptions;
this.httpSink = httpSink;
}

@Override
public void onMessage(HttpCarbonMessage httpCarbonMessage) {
countDownLatch.countDown();

}

@Override
public void onError(Throwable throwable) {
this.throwable = throwable;
countDownLatch.countDown();
httpSink.onError(payload, dynamicOptions,
new ConnectionUnavailableException("Siddhi App " + httpSink.siddhiAppContext.getName() +
" failed to publish events to HTTP endpoint: " + httpSink.publisherURL, throwable));
}
}
}
Loading