diff --git a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java index 9c3865ac..d2d234e4 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java +++ b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java @@ -522,7 +522,8 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state) } } - private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) { + private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) + throws ConnectionUnavailableException { //generate encoded base64 auth for getting refresh token String consumerKeyValue = consumerKey + ":" + consumerSecret; String encodedAuth = "Basic " + encodeBase64(consumerKeyValue); @@ -543,13 +544,13 @@ private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, Lis } else { log.error("Error at sending oauth request to API endpoint: " + publisherURL + "', with response code: " + response + ". Message dropped. Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint: " + + throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint: " + publisherURL + "', with response code: " + response + ". Message dropped. Message dropped."); } } private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) { + String encodedAuth) throws ConnectionUnavailableException { Boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth); if (checkFromCache) { @@ -560,7 +561,7 @@ private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, L } private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) { + String encodedAuth) throws ConnectionUnavailableException { String accessToken = accessTokenCache.getAccessToken(encodedAuth); for (Header header : headersList) { if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) { @@ -582,13 +583,13 @@ private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOp } else { log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped. "); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + + throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); } } private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) { + String encodedAuth) throws ConnectionUnavailableException { Boolean checkRefreshToken = accessTokenCache.checkRefreshAvailableKey(encodedAuth); if (checkRefreshToken) { for (Header header : headersList) { @@ -633,7 +634,7 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti } else { log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + + throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); } @@ -648,13 +649,14 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti } else { log.error("Failed to generate new access token for the expired access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired" + + throw new ConnectionUnavailableException("Failed to generate new access token for the expired" + " access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."); } } - private int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList, int tryCount) { + private int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList, int tryCount) + throws ConnectionUnavailableException { if (!publisherURLOption.isStatic()) { super.initClientConnector(dynamicOptions); } @@ -688,13 +690,13 @@ private int sendRequest(Object payload, DynamicOptions dynamicOptions, List:',':'\"`.\n" + @@ -475,7 +468,6 @@ public class HttpSink extends Sink { private long maxWaitTime; private String hostnameVerificationEnabled; private String sslVerificationDisabled; - private boolean isBlockingIO; private DefaultHttpWsConnectorFactory httpConnectorFactory; @@ -595,9 +587,6 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde authType = HttpConstants.NO_AUTH; } - isBlockingIO = Boolean.parseBoolean( - optionHolder.validateAndGetStaticValue(HttpConstants.BLOCKING_IO, HttpConstants.FALSE)); - initConnectorFactory(); if (publisherURLOption.isStatic()) { initClientConnector(null); @@ -632,7 +621,8 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state) } } - private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) { + private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) + throws ConnectionUnavailableException { //generate encoded base64 auth for getting refresh token String consumerKeyValue = consumerKey + ":" + consumerSecret; String encodedAuth = "Basic " + encodeBase64(consumerKeyValue) @@ -654,13 +644,13 @@ private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, Lis } else { log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + + throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + publisherURL + "', and response code: " + response + ". Message dropped."); } } private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) { + String encodedAuth) throws ConnectionUnavailableException { boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth); if (checkFromCache) { getNewAccessTokenWithCache(payload, dynamicOptions, headersList, encodedAuth); @@ -670,7 +660,7 @@ private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, L } private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) { + String encodedAuth) throws ConnectionUnavailableException { String accessToken = accessTokenCache.getAccessToken(encodedAuth); for (Header header : headersList) { if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) { @@ -692,13 +682,13 @@ private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOp } else { log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + publisherURL + + throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); } } private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) { + String encodedAuth) throws ConnectionUnavailableException { Boolean checkRefreshToken = accessTokenCache.checkRefreshAvailableKey(encodedAuth); if (checkRefreshToken) { for (Header header : headersList) { @@ -744,7 +734,7 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti } else { log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + + throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + response + ". Message dropped."); } } else if (accessTokenCache.getResponseCode(encodedAuth) == HttpConstants.AUTHENTICATION_FAIL_CODE) { @@ -758,7 +748,7 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti } else { log.error("Failed to generate new access token for the expired access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired" + + throw new ConnectionUnavailableException("Failed to generate new access token for the expired" + " access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."); } @@ -782,7 +772,7 @@ void getAccessToken(DynamicOptions dynamicOptions, String encodedAuth, String to } void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions, - List
headersList) { + List
headersList) throws ConnectionUnavailableException { //check the availability of the authorization String accessToken; boolean authAvailability = false; @@ -823,7 +813,7 @@ void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions, } else { log.error("Failed to generate new access token for the expired access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired" + + throw new ConnectionUnavailableException("Failed to generate new access token for the expired" + " access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."); } @@ -841,7 +831,8 @@ void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions, } } - private int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) { + private int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) + throws ConnectionUnavailableException { if (!publisherURLOption.isStatic()) { initClientConnector(dynamicOptions); } @@ -872,45 +863,21 @@ private int sendRequest(Object payload, DynamicOptions dynamicOptions, ListWSO255.645100" + - ", POST, 'Name:John','Age:23'], type=CURRENT, next=null}"; + Thread.sleep(3000); + String expectedMessage = "Dropping event at Sink 'http' at 'BarStream' as its still trying to " + + "reconnect!, events dropped 'WSO255.645" + + "100'"; Assert.assertTrue(appender.getMessages().contains(expectedMessage)); } finally { logger.removeAppender(appender); diff --git a/pom.xml b/pom.xml index e6e273aa..a21e7c2c 100644 --- a/pom.xml +++ b/pom.xml @@ -415,7 +415,7 @@ 5.0.2 2.0.1 5.0.2 - 5.1.2 + 5.1.4 [5.0.0,6.0.0) 1.0.0-m3 4.10.0