Skip to content

remove Thrift dataconverter support #1006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: v4.x.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.apache.thrift.protocol.TJSONProtocol;

/**
* Implements conversion through GSON JSON processor. To extend use {@link
* #JsonDataConverter(Function)} constructor. Thrift structures are converted using {@link
* TJSONProtocol}. When using thrift only one argument of a method is expected.
* #JsonDataConverter(Function)} constructor.
*
* @author fateev
*/
Expand Down Expand Up @@ -78,9 +76,7 @@ public JsonDataConverter(Function<GsonBuilder, GsonBuilder> builderInterceptor)
GsonBuilder gsonBuilder =
new GsonBuilder()
.serializeNulls()
.registerTypeAdapterFactory(new ThrowableTypeAdapterFactory())
.registerTypeAdapterFactory(new TBaseTypeAdapterFactory(metricsScope))
.registerTypeAdapterFactory(new TEnumTypeAdapterFactory());
.registerTypeAdapterFactory(new ThrowableTypeAdapterFactory());
GsonBuilder intercepted = builderInterceptor.apply(gsonBuilder);
gson = intercepted.create();
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
package com.uber.cadence.internal.common;

import com.google.common.base.Defaults;
import com.google.common.collect.Lists;
import com.uber.cadence.DataBlob;
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.HistoryEventFilterType;
import com.uber.cadence.Memo;
import com.uber.cadence.SearchAttributes;
import com.uber.cadence.TaskList;
Expand All @@ -33,15 +28,10 @@
import com.uber.cadence.workflow.WorkflowMethod;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

/** Utility functions shared by the implementation code. */
public final class InternalUtils {
Expand Down Expand Up @@ -164,93 +154,6 @@ public static SearchAttributes convertMapToSearchAttributes(
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
}

// This method serializes history to blob data
public static DataBlob SerializeFromHistoryToBlobData(History history) {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TSerializer serializer = new TSerializer();
DataBlob blob = new DataBlob();
try {
blob.setData(serializer.serialize(history));
} catch (org.apache.thrift.TException err) {
throw new RuntimeException("Serialize history to blob data failed", err);
}

return blob;
}

// This method deserialize the DataBlob data to the History data
public static History DeserializeFromBlobDataToHistory(
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TDeserializer deSerializer = new TDeserializer();
List<HistoryEvent> events = Lists.newArrayList();
for (DataBlob data : blobData) {
History history = new History();
try {
byte[] dataByte = data.getData();
// TODO: verify the beginning index
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
deSerializer.deserialize(history, dataByte);

if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
return null;
}
} catch (org.apache.thrift.TException err) {
throw new TException("Deserialize blob data to history failed with unknown error");
}

events.addAll(history.getEvents());
}

if (events.size() > 0 && historyEventFilterType == HistoryEventFilterType.CLOSE_EVENT) {
events = events.subList(events.size() - 1, events.size());
}

return new History().setEvents(events);
}

// This method serializes history event to blob data
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TSerializer serializer = new TSerializer();
List<DataBlob> blobs = Lists.newArrayListWithCapacity(events.size());
for (HistoryEvent event : events) {
DataBlob blob = new DataBlob();
try {
blob.setData(serializer.serialize(event));
} catch (org.apache.thrift.TException err) {
throw new RuntimeException("Serialize history event to blob data failed", err);
}
blobs.add(blob);
}
return blobs;
}

// This method serializes blob data to history event
public static List<HistoryEvent> DeserializeFromBlobDataToHistoryEvents(List<DataBlob> blobData)
throws TException {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TDeserializer deSerializer = new TDeserializer();
List<HistoryEvent> events = Lists.newArrayList();
for (DataBlob data : blobData) {
try {
HistoryEvent event = new HistoryEvent();
byte[] dataByte = data.getData();
// TODO: verify the beginning index
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
deSerializer.deserialize(event, dataByte);
events.add(event);
} catch (org.apache.thrift.TException err) {
throw new TException("Deserialize blob data to history event failed with unknown error");
}
}
return events;
}

/** Prohibit instantiation */
private InternalUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import com.google.common.collect.Lists;
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.HistoryEventFilterType;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.common.WorkflowExecutionHistory;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.common.RpcRetryer;
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
import com.uber.cadence.internal.metrics.MetricsType;
Expand Down Expand Up @@ -185,14 +182,10 @@ protected WorkflowExecutionHistory getFullHistory(String domain, WorkflowExecuti
nextPageToken, this.serviceClient, domain, execution.toThrift()));
pageToken = resp.getNextPageToken();

// handle raw history
// TODO support raw history feature once server removes default Thrift encoding
if (resp.getRawHistory() != null && resp.getRawHistory().size() > 0) {
History history =
InternalUtils.DeserializeFromBlobDataToHistory(
resp.getRawHistory(), HistoryEventFilterType.ALL_EVENT);
if (history != null && history.getEvents() != null) {
histories.addAll(history.getEvents());
}
throw new UnsupportedOperationException(
"Raw history is not supported. Please turn off frontend.sendRawWorkflowHistory feature flag in frontend service to recover");
} else {
histories.addAll(resp.getHistory().getEvents());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.uber.cadence.internal.testservice;

import com.uber.cadence.BadRequestError;
import com.uber.cadence.DataBlob;
import com.uber.cadence.EntityNotExistsError;
import com.uber.cadence.EventType;
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
Expand All @@ -34,7 +33,6 @@
import com.uber.cadence.StickyExecutionAttributes;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowExecutionInfo;
import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
import com.uber.cadence.internal.testservice.RequestContext.Timer;
import java.time.Duration;
Expand Down Expand Up @@ -348,24 +346,20 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
if (!getRequest.isWaitForNewEvent()
&& getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) {
List<HistoryEvent> events = history.getEventsLocked();
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
// Copy the list as it is mutable. Individual events assumed immutable.
ArrayList<HistoryEvent> eventsCopy = new ArrayList<>(events);
return new GetWorkflowExecutionHistoryResponse()
.setHistory(new History().setEvents(eventsCopy))
.setRawHistory(blobs);
.setHistory(new History().setEvents(eventsCopy));
}
expectedNextEventId = history.getNextEventIdLocked();
} finally {
lock.unlock();
}
List<HistoryEvent> events =
history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType());
List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse();
if (events != null) {
result.setHistory(new History().setEvents(events));
result.setRawHistory(blobs);
}
return result;
}
Expand Down
Loading
Loading