Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collectors and comparators configured by single config number #451

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to AET will be documented in this file.

**List of changes that are finished but not yet released in any final version.**

- [PR-451](https://github.com/Cognifide/aet/pull/451) Collectors and comparators configured by single config number
- [PR-449](https://github.com/Cognifide/aet/pull/449) Improvements to the Winter Edition Theme
- [PR-354](https://github.com/Cognifide/aet/pull/354) Remove jmsEndpointConfig information from communication settings endpoint ([#352](https://github.com/Cognifide/aet/issues/352))
- [PR-412](https://github.com/Cognifide/aet/pull/412) ([PR-336](https://github.com/Cognifide/aet/pull/336), [PR-337](https://github.com/Cognifide/aet/pull/337), [PR-395](https://github.com/Cognifide/aet/pull/395)) - Added rerun functionality for suite, test and url
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* AET
*
* Copyright (C) 2013 Cognifide Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.communication.api.queues;

public enum QueuesConstant implements WorkerConfig {
COLLECTOR("collectorJobs", "collectorResults"),
COMPARATOR("comparatorJobs", "comparatorResults");

public static final String NAMESPACE = "AET.";

final String jobsQueueName;
final String resultsQueueName;

QueuesConstant(String jobsQueueName, String resultsQueueName) {
this.jobsQueueName = jobsQueueName;
this.resultsQueueName = resultsQueueName;
}

@Override
public String getJobsQueueName() {
return NAMESPACE + jobsQueueName;
}

@Override
public String getResultsQueueName() {
return NAMESPACE + resultsQueueName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* AET
*
* Copyright (C) 2013 Cognifide Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.communication.api.queues;

public interface WorkerConfig {

/**
* @return name of the jobs queue
*/
String getJobsQueueName();

/**
* @return name of the results queue name
*/
String getResultsQueueName();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.cognifide.aet.runner;

import com.cognifide.aet.communication.api.exceptions.AETException;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.configuration.MessagesManagerConf;
import java.io.IOException;
import java.util.HashSet;
Expand All @@ -25,7 +26,6 @@
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.metatype.annotations.Designate;
Expand All @@ -48,8 +48,6 @@ public class MessagesManager {

private static final String QUEUES_ATTRIBUTE = "Queues";

private static final String AET_QUEUE_DOMAIN = "AET.";

static final String DESTINATION_NAME_PROPERTY = "destinationName";

private MessagesManagerConf config;
Expand Down Expand Up @@ -83,13 +81,6 @@ public void remove(String correlationID) throws AETException {
}
}

public static String createFullQueueName(String name) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException("Queue name can't be null or empty string!");
}
return AET_QUEUE_DOMAIN + name;
}

protected Set<ObjectName> getAetQueuesObjects(MBeanServerConnection connection)
throws AETException {
ObjectName[] queues;
Expand All @@ -106,7 +97,8 @@ protected Set<ObjectName> getAetQueuesObjects(MBeanServerConnection connection)
private Set<ObjectName> filter(ObjectName[] queuesObjects) {
Set<ObjectName> queues = new HashSet<>();
for (ObjectName queueObject : queuesObjects) {
if (queueObject.getKeyProperty(DESTINATION_NAME_PROPERTY).startsWith(AET_QUEUE_DOMAIN)) {
// filters all with the AET queue namespace
if (queueObject.getKeyProperty(DESTINATION_NAME_PROPERTY).startsWith(QueuesConstant.NAMESPACE)) {
queues.add(queueObject);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.wrappers.Run;
import com.cognifide.aet.queues.JmsUtils;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.scheduler.CollectorJobSchedulerService;
import javax.jms.Destination;
import javax.jms.JMSException;
Expand All @@ -45,11 +46,9 @@ public class RunnerMessageListener implements MessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(RunnerMessageListener.class);

private static final String API_QUEUE_IN = MessagesManager
.createFullQueueName("runner-in");
private static final String API_QUEUE_IN = QueuesConstant.NAMESPACE + "runner-in";

private static final String MAINTENANCE_QUEUE_IN = MessagesManager
.createFullQueueName("maintenance-in");
private static final String MAINTENANCE_QUEUE_IN = QueuesConstant.NAMESPACE + "maintenance-in";

private MessageConsumer inConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.wrappers.MetadataRunDecorator;
import com.cognifide.aet.communication.api.wrappers.UrlRunWrapper;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.RunnerConfiguration;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper;
import com.cognifide.aet.runner.scheduler.CollectorJobSchedulerService;
import com.cognifide.aet.runner.scheduler.MessageWithDestination;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected String getQueueInName() {

@Override
protected String getQueueOutName() {
return "AET.collectorJobs";
return QueuesConstant.COLLECTOR.getJobsQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.cognifide.aet.communication.api.metadata.Url;
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.util.ExecutionTimer;
import com.cognifide.aet.runner.MessagesManager;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.RunnerConfiguration;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper;
Expand Down Expand Up @@ -184,12 +184,12 @@ public boolean isFinished() {

@Override
protected String getQueueInName() {
return MessagesManager.createFullQueueName("collectorResults");
return QueuesConstant.COLLECTOR.getResultsQueueName();
}

@Override
protected String getQueueOutName() {
return MessagesManager.createFullQueueName("comparatorJobs");
return QueuesConstant.COMPARATOR.getJobsQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.cognifide.aet.communication.api.metadata.Url;
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.communication.api.util.ExecutionTimer;
import com.cognifide.aet.runner.MessagesManager;
import com.cognifide.aet.communication.api.queues.QueuesConstant;
import com.cognifide.aet.runner.RunnerConfiguration;
import com.cognifide.aet.runner.processing.TimeoutWatch;
import com.cognifide.aet.runner.processing.data.wrappers.RunIndexWrapper;
Expand Down Expand Up @@ -150,7 +150,7 @@ public void abort() {

@Override
protected String getQueueInName() {
return MessagesManager.createFullQueueName("comparatorResults");
return QueuesConstant.COMPARATOR.getResultsQueueName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.cognifide.aet.runner;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
Expand All @@ -41,22 +39,6 @@ public class MessagesManagerTest {
@Mock
private MessagesManagerConf config;

@Test(expected = IllegalArgumentException.class)
public void createFullQueueName_whenNameIsNull_expectException() throws Exception {
MessagesManager.createFullQueueName(null);
}

@Test(expected = IllegalArgumentException.class)
public void createFullQueueName_whenNameIsEmpty_expectException() throws Exception {
MessagesManager.createFullQueueName("");
}

@Test
public void createFullQueueName_expectFullName() throws Exception {
String fullQueueName = MessagesManager.createFullQueueName("test");
assertThat(fullQueueName, is("AET.test"));
}

@Test
public void remove_ExpectRemovingInvoked() throws Exception {
MessagesManager messagesManager = new MessagesManager();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* AET
*
* Copyright (C) 2013 Cognifide Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.worker.exceptions;

/**
* Thrown when failed to create consumer.
*/
public class ConsumerInitException extends RuntimeException {

public ConsumerInitException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,22 @@
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(
service = CollectorMessageListenerImpl.class,
immediate = true)
@Designate(ocd = CollectorMessageListenerImplConfig.class, factory = true)
public class CollectorMessageListenerImpl extends AbstractTaskMessageListener {
class CollectorMessageListener extends WorkerMessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(CollectorMessageListenerImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(CollectorMessageListener.class);

@Reference
private JmsConnection jmsConnection;
private final CollectorDispatcher dispatcher;
private final WebDriverProvider webDriverProvider;

@Reference
private CollectorDispatcher dispatcher;

@Reference
private WebDriverProvider webDriverProvider;

private CollectorMessageListenerImplConfig config;

@Activate
void activate(CollectorMessageListenerImplConfig config) {
this.config = config;
super.doActivate(config.consumerQueueName(), config.producerQueueName(), config.pf());
}

@Deactivate
void deactivate() {
super.doDeactivate();
CollectorMessageListener(String name, CollectorDispatcher dispatcher,
WebDriverProvider webDriverProvider, JmsConnection jmsConnection, String consumerQueueName,
String producerQueueName) {
super(name, jmsConnection, consumerQueueName, producerQueueName);
this.dispatcher = dispatcher;
this.webDriverProvider = webDriverProvider;
}

@Override
Expand All @@ -75,15 +55,15 @@ public void onMessage(final Message message) {
try {
collectorJobData = JmsUtils.getFromMessage(message, CollectorJobData.class);
} catch (JMSException e) {
LOGGER.error("Invalid message obtained!", e);
LOGGER.error("[{}] Invalid message obtained!", name, e);
}
String correlationId = JmsUtils.getJMSCorrelationID(message);
String requestMessageId = JmsUtils.getJMSMessageID(message);
if (collectorJobData != null && StringUtils.isNotBlank(correlationId)
&& requestMessageId != null) {
LOGGER.info(
"CollectorJobData [{}] message arrived with {} urls. CorrelationId: {} RequestMessageId: {}",
config.name(), collectorJobData.getUrls().size(), correlationId,
"[{}] CollectorJobData message arrived with {} urls. CorrelationId: {} RequestMessageId: {}",
name, collectorJobData.getUrls().size(), correlationId,
requestMessageId);
WebCommunicationWrapper webCommunicationWrapper = null;
int collected = 0;
Expand All @@ -101,7 +81,7 @@ public void onMessage(final Message message) {
} catch (WorkerException e) {
for (Url url : collectorJobData.getUrls()) {
String errorMessage = String.format(
"Couldn't process following url `%s` because of error: %s", url.getUrl(),
"[%s] Couldn't process following url `%s` because of error: %s", name, url.getUrl(),
e.getMessage());
LOGGER.error(errorMessage, e);
// updates all steps with worker exception
Expand All @@ -118,7 +98,7 @@ public void onMessage(final Message message) {
} finally {
quitWebDriver(webCommunicationWrapper);
}
LOGGER.info("Successfully collected from {}/{} urls.", collected,
LOGGER.info("[{}] Successfully collected from {}/{} urls.", name, collected,
collectorJobData.getUrls().size());
}

Expand All @@ -140,9 +120,8 @@ private int runUrls(CollectorJobData collectorJobData, String requestMessageId,
processedUrl.setCollectionStats(timer.toStatistics());
feedbackQueue.sendObjectMessageWithCorrelationID(collectorResultData, correlationId);
} catch (Exception e) {
LOGGER.error("Unrecognized collector error", e);
LOGGER.error("[{}] Unrecognized collector error", name, e);
final String message = "Unrecognized collector error: " + e.getMessage();

CollectorStepResult collectorStepProcessingError =
CollectorStepResult.newProcessingErrorResult(message);
for (Step step : url.getSteps()) {
Expand Down Expand Up @@ -175,9 +154,4 @@ private void quitWebDriver(WebCommunicationWrapper webCommunicationWrapper) {
}
}

@Override
protected JmsConnection getJmsConnection() {
return jmsConnection;
}

}
Loading