Skip to content

Commit

Permalink
Migrate from Proto1 to Proto2 (#282)
Browse files Browse the repository at this point in the history
* replaced ProtocolMessage with protobuf.Message

* wip

* wip

* wip

* wip
  • Loading branch information
srinjoyray authored Oct 7, 2024
1 parent dafbdac commit e2292f1
Show file tree
Hide file tree
Showing 26 changed files with 528 additions and 477 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.google.apphosting.datastore.proto2api.DatastoreV3Pb;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.storage.onestore.v3.OnestoreEntity;
import com.google.storage.onestore.v3.proto2api.OnestoreEntity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -266,10 +266,10 @@ public Set<Index> compositeIndexesForQuery(Query query) {
for (DatastoreV3Pb.Query queryProto : pbQueries) {
IndexComponentsOnlyQuery indexQuery = new IndexComponentsOnlyQuery(queryProto);

OnestoreEntity.Index index =
OnestoreEntity.Index.Builder index =
factory.getCompositeIndexManager().compositeIndexForQuery(indexQuery);
if (index != null) {
resultSet.add(IndexTranslator.convertFromPb(index));
resultSet.add(IndexTranslator.convertFromPb(index.build()));
}
}
return resultSet;
Expand All @@ -293,10 +293,10 @@ public Set<Index> minimumCompositeIndexesForQuery(Query query, Collection<Index>
for (DatastoreV3Pb.Query queryProto : pbQueries) {
IndexComponentsOnlyQuery indexQuery = new IndexComponentsOnlyQuery(queryProto);

OnestoreEntity.Index index =
OnestoreEntity.Index.Builder index =
factory.getCompositeIndexManager().minimumCompositeIndexForQuery(indexQuery, indexPbs);
if (index != null) {
resultSet.add(IndexTranslator.convertFromPb(index));
resultSet.add(IndexTranslator.convertFromPb(index.build()));
}
}
return resultSet;
Expand All @@ -316,8 +316,8 @@ private static List<DatastoreV3Pb.Query> convertQueryToPbs(
for (List<FilterPredicate> singleQuery : parallelQueries) {
Query newQuery = new Query(query);
newQuery.getFilterPredicates().addAll(singleQuery);
DatastoreV3Pb.Query queryProto = QueryTranslator.convertToPb(newQuery, fetchOptions);
resultQueries.add(queryProto);
DatastoreV3Pb.Query.Builder queryProto = QueryTranslator.convertToPb(newQuery, fetchOptions);
resultQueries.add(queryProto.build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
import com.google.apphosting.datastore.proto2api.DatastoreV3Pb.PutResponse;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.io.protocol.ProtocolMessage;
// import com.google.io.protocol.ProtocolMessage;
import com.google.protobuf.Message;
import com.google.storage.onestore.v3.proto2api.OnestoreEntity.CompositeIndex;
import com.google.storage.onestore.v3.proto2api.OnestoreEntity.EntityProto;
import com.google.storage.onestore.v3.proto2api.OnestoreEntity.Reference;
Expand Down Expand Up @@ -77,10 +78,10 @@ class AsyncDatastoreServiceImpl extends BaseAsyncDatastoreServiceImpl {
* @param <T> the proto representation of value
*/
private abstract class V3Batcher<
S extends ProtocolMessage<S>,
R extends ProtocolMessage<R>,
S extends Message,
R extends Message,
F,
T extends ProtocolMessage<T>>
T extends Message>
extends BaseRpcBatcher<S, R, F, T> {
@Override
final R newBatch(R baseBatch) {
Expand All @@ -94,7 +95,7 @@ final R newBatch(R baseBatch) {
* @param <S> the response message type
* @param <R> the request message type
*/
private abstract class V3KeyBatcher<S extends ProtocolMessage<S>, R extends ProtocolMessage<R>>
private abstract class V3KeyBatcher<S extends Message, R extends Message>
extends V3Batcher<S, R, Key, Reference> {
@Override
final Object getGroup(Key value) {
Expand All @@ -103,15 +104,15 @@ final Object getGroup(Key value) {

@Override
final Reference toPb(Key value) {
return KeyTranslator.convertToPb(value);
return KeyTranslator.convertToPb(value).build();
}
}

private final V3KeyBatcher<DeleteResponse, DeleteRequest> deleteBatcher =
new V3KeyBatcher<DeleteResponse, DeleteRequest>() {
@Override
void addToBatch(Reference value, DeleteRequest batch) {
batch.addKey(value);
batch.toBuilder().addKey(value).build();
}

@Override
Expand All @@ -122,15 +123,15 @@ int getMaxCount() {
@Override
protected Future<DeleteResponse> makeCall(DeleteRequest batch) {
return makeAsyncCall(
apiConfig, DatastoreService_3.Method.Delete, batch, new DeleteResponse());
apiConfig, DatastoreService_3.Method.Delete, batch, DeleteResponse.newBuilder().build());
}
};

private final V3KeyBatcher<GetResponse, GetRequest> getByKeyBatcher =
new V3KeyBatcher<GetResponse, GetRequest>() {
@Override
void addToBatch(Reference value, GetRequest batch) {
batch.addKey(value);
batch.toBuilder().addKey(value).build();
}

@Override
Expand All @@ -140,7 +141,7 @@ int getMaxCount() {

@Override
protected Future<GetResponse> makeCall(GetRequest batch) {
return makeAsyncCall(apiConfig, DatastoreService_3.Method.Get, batch, new GetResponse());
return makeAsyncCall(apiConfig, DatastoreService_3.Method.Get, batch, GetResponse.newBuilder().build());
}
};

Expand All @@ -158,7 +159,7 @@ final Reference toPb(Reference value) {

@Override
void addToBatch(Reference value, GetRequest batch) {
batch.addKey(value);
batch.toBuilder().addKey(value).build();
}

@Override
Expand All @@ -168,7 +169,7 @@ int getMaxCount() {

@Override
protected Future<GetResponse> makeCall(GetRequest batch) {
return makeAsyncCall(apiConfig, DatastoreService_3.Method.Get, batch, new GetResponse());
return makeAsyncCall(apiConfig, DatastoreService_3.Method.Get, batch, GetResponse.newBuilder().build());
}
};

Expand All @@ -181,7 +182,7 @@ Object getGroup(Entity value) {

@Override
void addToBatch(EntityProto value, PutRequest batch) {
batch.addEntity(value);
batch.toBuilder().addEntity(value);
}

@Override
Expand All @@ -191,7 +192,7 @@ int getMaxCount() {

@Override
protected Future<PutResponse> makeCall(PutRequest batch) {
return makeAsyncCall(apiConfig, DatastoreService_3.Method.Put, batch, new PutResponse());
return makeAsyncCall(apiConfig, DatastoreService_3.Method.Put, batch, PutResponse.newBuilder().build());
}

@Override
Expand All @@ -215,8 +216,8 @@ public AsyncDatastoreServiceImpl(

@Override
protected TransactionImpl.InternalTransaction doBeginTransaction(TransactionOptions options) {
DatastoreV3Pb.Transaction remoteTxn = new DatastoreV3Pb.Transaction();
DatastoreV3Pb.BeginTransactionRequest request = new DatastoreV3Pb.BeginTransactionRequest();
DatastoreV3Pb.Transaction.Builder remoteTxn = DatastoreV3Pb.Transaction.newBuilder();
DatastoreV3Pb.BeginTransactionRequest.Builder request = DatastoreV3Pb.BeginTransactionRequest.newBuilder();
request.setApp(datastoreServiceConfig.getAppIdNamespace().getAppId());
request.setAllowMultipleEg(options.isXG());
if (options.previousTransaction() != null) {
Expand Down Expand Up @@ -246,7 +247,7 @@ protected TransactionImpl.InternalTransaction doBeginTransaction(TransactionOpti

Future<DatastoreV3Pb.Transaction> future =
DatastoreApiHelper.makeAsyncCall(
apiConfig, DatastoreService_3.Method.BeginTransaction, request, remoteTxn);
apiConfig, DatastoreService_3.Method.BeginTransaction, request.build(), remoteTxn.build());

return new InternalTransactionV3(apiConfig, request.getApp(), future);
}
Expand All @@ -255,7 +256,7 @@ protected TransactionImpl.InternalTransaction doBeginTransaction(TransactionOpti
protected final Future<Map<Key, Entity>> doBatchGet(
@Nullable Transaction txn, final Set<Key> keysToGet, final Map<Key, Entity> resultMap) {
// Initializing base request.
final GetRequest baseReq = new GetRequest();
final GetRequest.Builder baseReq = GetRequest.newBuilder();
baseReq.setAllowDeferred(true);
if (txn != null) {
TransactionImpl.ensureTxnActive(txn);
Expand All @@ -272,7 +273,7 @@ protected final Future<Map<Key, Entity>> doBatchGet(
// Batch and issue the request(s).
Iterator<GetRequest> batches =
getByKeyBatcher.getBatches(
keysToGet, baseReq, baseReq.getSerializedSize(), shouldUseMultipleBatches);
keysToGet, baseReq.build(), baseReq.getKeyCount(), shouldUseMultipleBatches);
List<Future<GetResponse>> futures = getByKeyBatcher.makeCalls(batches);

return registerInTransaction(
Expand Down Expand Up @@ -329,7 +330,7 @@ private void aggregate(
GetResponse resp =
getFutureWithOptionalTimeout(currentFuture, timeout, timeoutUnit);
addEntitiesToResultMap(resp);
deferredRefs.addAll(resp.deferreds());
deferredRefs.addAll(resp.getDeferredList());
}

if (deferredRefs.isEmpty()) {
Expand All @@ -340,7 +341,7 @@ private void aggregate(
// Some keys were deferred. Issue followup requests, and loop again.
Iterator<GetRequest> followupBatches =
getByReferenceBatcher.getBatches(
deferredRefs, baseReq, baseReq.getSerializedSize(), shouldUseMultipleBatches);
deferredRefs, baseReq.build(), baseReq.getKeyCount(), shouldUseMultipleBatches);
currentFutures = getByReferenceBatcher.makeCalls(followupBatches);
}
}
Expand Down Expand Up @@ -370,14 +371,14 @@ private GetResponse getFutureWithOptionalTimeout(
* #findKeyFromRequestIgnoringAppId(Reference)}
*/
private void addEntitiesToResultMap(GetResponse response) {
for (GetResponse.Entity entityResult : response.entitys()) {
for (GetResponse.Entity entityResult : response.getEntityList()) {
if (entityResult.hasEntity()) {
Entity responseEntity = EntityTranslator.createFromPb(entityResult.getEntity());
Key responseKey = responseEntity.getKey();

// Hack for Remote API which rewrites App Ids on Keys.
if (!keysToGet.contains(responseKey)) {
responseKey = findKeyFromRequestIgnoringAppId(entityResult.getEntity().getKey());
responseKey = findKeyFromRequestIgnoringAppId(entityResult.getEntity().getKey().toBuilder());
}
resultMap.put(responseKey, responseEntity);
}
Expand Down Expand Up @@ -406,14 +407,14 @@ private void addEntitiesToResultMap(GetResponse response) {
* @return the Key from the request that corresponds to the given Reference from the
* Response (ignoring AppId.)
*/
private Key findKeyFromRequestIgnoringAppId(Reference referenceFromResponse) {
private Key findKeyFromRequestIgnoringAppId(Reference.Builder referenceFromResponse) {
// We'll create this Map lazily the first time, then cache it for future calls.
if (keyMapIgnoringAppId == null) {
keyMapIgnoringAppId = Maps.newHashMap();
for (Key requestKey : keysToGet) {
Reference requestKeyAsRefWithoutApp =
Reference.Builder requestKeyAsRefWithoutApp =
KeyTranslator.convertToPb(requestKey).clearApp();
keyMapIgnoringAppId.put(requestKeyAsRefWithoutApp, requestKey);
keyMapIgnoringAppId.put(requestKeyAsRefWithoutApp.build(), requestKey);
}
}

Expand All @@ -430,15 +431,15 @@ private Key findKeyFromRequestIgnoringAppId(Reference referenceFromResponse) {

@Override
protected Future<List<Key>> doBatchPut(@Nullable Transaction txn, final List<Entity> entities) {
PutRequest baseReq = new PutRequest();
PutRequest.Builder baseReq = PutRequest.newBuilder();
if (txn != null) {
TransactionImpl.ensureTxnActive(txn);
baseReq.setTransaction(InternalTransactionV3.toProto(txn));
}
boolean group = !baseReq.hasTransaction(); // Do not group when inside a transaction.
final List<Integer> order = Lists.newArrayListWithCapacity(entities.size());
Iterator<PutRequest> batches =
putBatcher.getBatches(entities, baseReq, baseReq.getSerializedSize(), group, order);
putBatcher.getBatches(entities, baseReq.build(), baseReq.getEntityCount(), group, order);
List<Future<PutResponse>> futures = putBatcher.makeCalls(batches);

return registerInTransaction(
Expand All @@ -447,7 +448,7 @@ protected Future<List<Key>> doBatchPut(@Nullable Transaction txn, final List<Ent
@Override
protected List<Key> aggregate(
PutResponse intermediateResult, Iterator<Integer> indexItr, List<Key> result) {
for (Reference reference : intermediateResult.keys()) {
for (Reference reference : intermediateResult.getKeyList()) {
int index = indexItr.next();
Key key = entities.get(index).getKey();
KeyTranslator.updateKey(reference, key);
Expand All @@ -467,14 +468,14 @@ protected List<Key> initResult() {

@Override
protected Future<Void> doBatchDelete(@Nullable Transaction txn, Collection<Key> keys) {
DeleteRequest baseReq = new DeleteRequest();
DeleteRequest.Builder baseReq = DeleteRequest.newBuilder();
if (txn != null) {
TransactionImpl.ensureTxnActive(txn);
baseReq.setTransaction(InternalTransactionV3.toProto(txn));
}
boolean group = !baseReq.hasTransaction(); // Do not group inside a transaction.
Iterator<DeleteRequest> batches =
deleteBatcher.getBatches(keys, baseReq, baseReq.getSerializedSize(), group);
deleteBatcher.getBatches(keys, baseReq.build(), baseReq.getKeyCount(), group);
List<Future<DeleteResponse>> futures = deleteBatcher.makeCalls(batches);
return registerInTransaction(
txn,
Expand Down Expand Up @@ -505,7 +506,7 @@ static Reference buildAllocateIdsRef(Key parent, String kind, AppIdNamespace app
}
// the datastore just ignores the name component
Key key = new Key(kind, parent, Key.NOT_ASSIGNED, "ignored", appIdNamespace);
return KeyTranslator.convertToPb(key);
return KeyTranslator.convertToPb(key).build();
}

@Override
Expand All @@ -521,10 +522,12 @@ public Future<KeyRange> allocateIds(final Key parent, final String kind, long nu
// kind validation taken care of by the next call
final AppIdNamespace appIdNamespace = datastoreServiceConfig.getAppIdNamespace();
Reference allocateIdsRef = buildAllocateIdsRef(parent, kind, appIdNamespace);
AllocateIdsRequest req = new AllocateIdsRequest().setSize(num).setModelKey(allocateIdsRef);
AllocateIdsResponse resp = new AllocateIdsResponse();
AllocateIdsRequest.Builder req = AllocateIdsRequest.newBuilder();
req.setSize(num);
req.setModelKey(allocateIdsRef);
AllocateIdsResponse.Builder resp = AllocateIdsResponse.newBuilder();
Future<AllocateIdsResponse> future =
makeAsyncCall(apiConfig, DatastoreService_3.Method.AllocateIds, req, resp);
makeAsyncCall(apiConfig, DatastoreService_3.Method.AllocateIds, req.build(), resp.build());
return new FutureWrapper<AllocateIdsResponse, KeyRange>(future) {
@Override
protected KeyRange wrap(AllocateIdsResponse resp) throws Exception {
Expand All @@ -545,13 +548,12 @@ public Future<KeyRangeState> allocateIdRange(final KeyRange range) {
final long start = range.getStart().getId();
long end = range.getEnd().getId();

AllocateIdsRequest req =
new AllocateIdsRequest()
.setModelKey(AsyncDatastoreServiceImpl.buildAllocateIdsRef(parent, kind, null))
.setMax(end);
AllocateIdsResponse resp = new AllocateIdsResponse();
AllocateIdsRequest.Builder req = AllocateIdsRequest.newBuilder();
req.setModelKey(AsyncDatastoreServiceImpl.buildAllocateIdsRef(parent, kind, null));
req.setMax(end);
AllocateIdsResponse.Builder resp = AllocateIdsResponse.newBuilder();
Future<AllocateIdsResponse> future =
makeAsyncCall(apiConfig, DatastoreService_3.Method.AllocateIds, req, resp);
makeAsyncCall(apiConfig, DatastoreService_3.Method.AllocateIds, req.build(), resp.build());
return new FutureWrapper<AllocateIdsResponse, KeyRangeState>(future) {
@SuppressWarnings("deprecation")
@Override
Expand Down Expand Up @@ -594,13 +596,13 @@ public Future<Map<Index, IndexState>> getIndexes() {
.build();
return new FutureWrapper<CompositeIndices, Map<Index, IndexState>>(
makeAsyncCall(
apiConfig, DatastoreService_3.Method.GetIndices, req, new CompositeIndices())) {
apiConfig, DatastoreService_3.Method.GetIndices, req, CompositeIndices.newBuilder().build())) {
@Override
protected Map<Index, IndexState> wrap(CompositeIndices indices) throws Exception {
Map<Index, IndexState> answer = new LinkedHashMap<Index, IndexState>();
for (CompositeIndex ci : indices.indexs()) {
for (CompositeIndex ci : indices.getIndexList()) {
Index index = IndexTranslator.convertFromPb(ci);
switch (ci.getStateEnum()) {
switch (ci.getState()) {
case DELETED:
answer.put(index, IndexState.DELETING);
break;
Expand Down
Loading

0 comments on commit e2292f1

Please sign in to comment.