Skip to content

Commit

Permalink
#215 - Polishing.
Browse files Browse the repository at this point in the history
Reintroduce deprecated setBeanFactory(…) method.
Extract code into methods.
Ensure that versioned entities are eagerly initialized to allow retries.

Original pull request: #397.
  • Loading branch information
mp911de authored and schauder committed Jul 22, 2020
1 parent a70bd0a commit 4140981
Showing 1 changed file with 82 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.reactivestreams.Publisher;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.convert.ConversionService;
Expand Down Expand Up @@ -77,7 +79,7 @@
* @author Bogdan Ilchyshyn
* @since 1.1
*/
public class R2dbcEntityTemplate implements R2dbcEntityOperations, ApplicationContextAware {
public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAware, ApplicationContextAware {

private final DatabaseClient databaseClient;

Expand Down Expand Up @@ -123,6 +125,15 @@ public DatabaseClient getDatabaseClient() {
return this.databaseClient;
}

/*
* (non-Javadoc)
* @see org.springframework.beans.factory.BeanFactoryAware#setBeanFactory(org.springframework.beans.factory.BeanFactory)
* @deprecated since 1.2 in favor of #setApplicationContext.
*/
@Override
@Deprecated
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {}

/*
* (non-Javadoc)
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
Expand Down Expand Up @@ -431,33 +442,38 @@ <T> Mono<T> doInsert(T entity, SqlIdentifier tableName) {

RelationalPersistentEntity<T> persistentEntity = getRequiredEntity(entity);

return Mono.defer(() -> maybeCallBeforeConvert(setVersionIfNecessary(persistentEntity, entity), tableName)
T entityWithVersion = setVersionIfNecessary(persistentEntity, entity);

return maybeCallBeforeConvert(entityWithVersion, tableName)
.flatMap(beforeConvert -> {

OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(beforeConvert);

return maybeCallBeforeSave(beforeConvert, outboundRow, tableName).flatMap(entityToSave -> {
return maybeCallBeforeSave(beforeConvert, outboundRow, tableName) //
.flatMap(entityToSave -> doInsert(entityToSave, tableName, outboundRow));
});
}

private <T> Mono<T> doInsert(T entity, SqlIdentifier tableName, OutboundRow outboundRow) {

StatementMapper mapper = dataAccessStrategy.getStatementMapper();
StatementMapper.InsertSpec insert = mapper.createInsert(tableName);
StatementMapper mapper = dataAccessStrategy.getStatementMapper();
StatementMapper.InsertSpec insert = mapper.createInsert(tableName);

for (SqlIdentifier column : outboundRow.keySet()) {
SettableValue settableValue = outboundRow.get(column);
if (settableValue.hasValue()) {
insert = insert.withColumn(column, settableValue);
}
}
for (SqlIdentifier column : outboundRow.keySet()) {
SettableValue settableValue = outboundRow.get(column);
if (settableValue.hasValue()) {
insert = insert.withColumn(column, settableValue);
}
}

PreparedOperation<?> operation = mapper.getMappedObject(insert);
PreparedOperation<?> operation = mapper.getMappedObject(insert);

return this.databaseClient.execute(operation) //
.filter(statement -> statement.returnGeneratedValues())
.map(this.dataAccessStrategy.getConverter().populateIdIfNecessary(entityToSave)) //
.first() //
.defaultIfEmpty(entityToSave) //
.flatMap(saved -> maybeCallAfterSave(saved, outboundRow, tableName));
});
}));
return this.databaseClient.execute(operation) //
.filter(statement -> statement.returnGeneratedValues())
.map(this.dataAccessStrategy.getConverter().populateIdIfNecessary(entity)) //
.first() //
.defaultIfEmpty(entity) //
.flatMap(saved -> maybeCallAfterSave(saved, outboundRow, tableName));
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -493,9 +509,22 @@ private <T> Mono<T> doUpdate(T entity, SqlIdentifier tableName) {

RelationalPersistentEntity<T> persistentEntity = getRequiredEntity(entity);

return maybeCallBeforeConvert(entity, tableName).flatMap(beforeConvert -> {
T entityToUse;
Criteria matchingVersionCriteria;

if (persistentEntity.hasVersionProperty()) {

OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(entity);
matchingVersionCriteria = createMatchingVersionCriteria(entity, persistentEntity);
entityToUse = incrementVersion(persistentEntity, entity);
} else {

entityToUse = entity;
matchingVersionCriteria = null;
}

return maybeCallBeforeConvert(entityToUse, tableName).flatMap(beforeConvert -> {

OutboundRow outboundRow = dataAccessStrategy.getOutboundRow(beforeConvert);

return maybeCallBeforeSave(beforeConvert, outboundRow, tableName) //
.flatMap(entityToSave -> {
Expand All @@ -504,43 +533,44 @@ private <T> Mono<T> doUpdate(T entity, SqlIdentifier tableName) {
SettableValue id = outboundRow.remove(idColumn);
Criteria criteria = Criteria.where(dataAccessStrategy.toSql(idColumn)).is(id);

T saved;

if (persistentEntity.hasVersionProperty()) {
criteria = criteria.and(createMatchingVersionCriteria(entity, persistentEntity));
saved = incrementVersion(persistentEntity, entity, outboundRow);
} else {
saved = entityToSave;
if (matchingVersionCriteria != null) {
criteria = criteria.and(matchingVersionCriteria);
}

Update update = Update.from((Map) outboundRow);
return doUpdate(entityToSave, tableName, persistentEntity, criteria, outboundRow);
});
});
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> Mono<T> doUpdate(T entity, SqlIdentifier tableName, RelationalPersistentEntity<T> persistentEntity,
Criteria criteria, OutboundRow outboundRow) {

StatementMapper mapper = dataAccessStrategy.getStatementMapper();
StatementMapper.UpdateSpec updateSpec = mapper.createUpdate(tableName, update).withCriteria(criteria);
Update update = Update.from((Map) outboundRow);

PreparedOperation<?> operation = mapper.getMappedObject(updateSpec);
StatementMapper mapper = dataAccessStrategy.getStatementMapper();
StatementMapper.UpdateSpec updateSpec = mapper.createUpdate(tableName, update).withCriteria(criteria);

return this.databaseClient.execute(operation) //
.fetch() //
.rowsUpdated() //
.handle((rowsUpdated, sink) -> {
PreparedOperation<?> operation = mapper.getMappedObject(updateSpec);

if (rowsUpdated != 0) {
return;
}
return this.databaseClient.execute(operation) //
.fetch() //
.rowsUpdated() //
.handle((rowsUpdated, sink) -> {

if (persistentEntity.hasVersionProperty()) {
sink.error(new OptimisticLockingFailureException(
formatOptimisticLockingExceptionMessage(saved, persistentEntity)));
} else {
sink.error(new TransientDataAccessResourceException(
formatTransientEntityExceptionMessage(saved, persistentEntity)));
}
}).then(maybeCallAfterSave(saved, outboundRow, tableName));
});
});
}
if (rowsUpdated != 0) {
return;
}

if (persistentEntity.hasVersionProperty()) {
sink.error(new OptimisticLockingFailureException(
formatOptimisticLockingExceptionMessage(entity, persistentEntity)));
} else {
sink.error(new TransientDataAccessResourceException(
formatTransientEntityExceptionMessage(entity, persistentEntity)));
}
}).then(maybeCallAfterSave(entity, outboundRow, tableName));
}

private <T> String formatOptimisticLockingExceptionMessage(T entity, RelationalPersistentEntity<T> persistentEntity) {

Expand All @@ -555,7 +585,7 @@ private <T> String formatTransientEntityExceptionMessage(T entity, RelationalPer
}

@SuppressWarnings("unchecked")
private <T> T incrementVersion(RelationalPersistentEntity<T> persistentEntity, T entity, OutboundRow outboundRow) {
private <T> T incrementVersion(RelationalPersistentEntity<T> persistentEntity, T entity) {

PersistentPropertyAccessor<?> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
RelationalPersistentProperty versionProperty = persistentEntity.getVersionProperty();
Expand All @@ -569,8 +599,6 @@ private <T> T incrementVersion(RelationalPersistentEntity<T> persistentEntity, T
Class<?> versionPropertyType = versionProperty.getType();
propertyAccessor.setProperty(versionProperty, conversionService.convert(newVersionValue, versionPropertyType));

outboundRow.put(versionProperty.getColumnName(), SettableValue.from(newVersionValue));

return (T) propertyAccessor.getBean();
}

Expand Down

0 comments on commit 4140981

Please sign in to comment.