Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/containers/Tenant/ObjectSummary/SchemaTree/SchemaTree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from '../../../../store/reducers/capabilities/hooks';
import {selectIsDirty, selectUserInput} from '../../../../store/reducers/query/query';
import {schemaApi} from '../../../../store/reducers/schema/schema';
import {streamingQueriesApi} from '../../../../store/reducers/streamingQuery/streamingQuery';
import {tableSchemaDataApi} from '../../../../store/reducers/tableSchemaData';
import type {EPathType, TEvDescribeSchemeResult} from '../../../../types/api/schema';
import {uiFactory} from '../../../../uiFactory/uiFactory';
Expand All @@ -22,6 +23,7 @@ import {getSchemaControls} from '../../utils/controls';
import {
isChildlessPathType,
mapPathTypeToNavigationTreeType,
nodeStreamingQueryTypeToPathType,
nodeTableTypeToPathType,
} from '../../utils/schema';
import {getActions} from '../../utils/schemaActions';
Expand Down Expand Up @@ -49,6 +51,10 @@ export function SchemaTree(props: SchemaTreeProps) {
getTableSchemaDataQuery,
{currentData: actionsSchemaData, isFetching: isActionsDataFetching},
] = tableSchemaDataApi.useLazyGetTableSchemaDataQuery();
const [
getStreamingQueryInfo,
{currentData: streamingSysData, isFetching: isStreamingInfoFetching},
] = streamingQueriesApi.useLazyGetStreamingQueryInfoQuery();

const isTopicPreviewAvailable = useTopicDataAvailable();

Expand Down Expand Up @@ -146,6 +152,8 @@ export function SchemaTree(props: SchemaTreeProps) {
schemaData: actionsSchemaData,
isSchemaDataLoading: isActionsDataFetching,
hasMonitoring: typeof uiFactory.renderMonitoring === 'function',
streamingQueryData: streamingSysData,
isStreamingQueryTextLoading: isStreamingInfoFetching,
},
databaseFullPath,
database,
Expand All @@ -160,6 +168,8 @@ export function SchemaTree(props: SchemaTreeProps) {
onActivePathUpdate,
databaseFullPath,
database,
streamingSysData,
isStreamingInfoFetching,
]);

return (
Expand Down Expand Up @@ -188,6 +198,11 @@ export function SchemaTree(props: SchemaTreeProps) {
getTableSchemaDataQuery({path, database, type: pathType, databaseFullPath});
}

const streamingPathType = nodeStreamingQueryTypeToPathType[type];
if (isOpen && streamingPathType) {
getStreamingQueryInfo({database, path});
}

return [];
}}
renderAdditionalNodeElements={getSchemaControls(
Expand Down
4 changes: 4 additions & 0 deletions src/containers/Tenant/i18n/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"actions.createAsyncReplication": "Create async replication...",
"actions.createTransfer": "Create transfer...",
"actions.createView": "Create view...",
"actions.createStreamingQuery": "Create streaming query...",
"actions.dropTable": "Drop table...",
"actions.dropTopic": "Drop topic...",
"actions.dropView": "Drop view...",
Expand All @@ -51,6 +52,9 @@
"actions.alterTransfer": "Alter transfer...",
"actions.dropReplication": "Drop async replicaton...",
"actions.dropTransfer": "Drop transfer...",
"actions.dropStreamingQuery": "Drop query...",
"actions.alterStreamingQuerySettings": "Alter query settings...",
"actions.alterStreamingQueryText": "Alter query text...",
"actions.createDirectory": "Create directory",
"schema.tree.dialog.placeholder": "Relative path",
"schema.tree.dialog.invalid": "Invalid path",
Expand Down
5 changes: 5 additions & 0 deletions src/containers/Tenant/utils/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ export const nodeTableTypeToPathType: Partial<Record<NavigationTreeNodeType, EPa
view: EPathType.EPathTypeView,
};

export const nodeStreamingQueryTypeToPathType: Partial<Record<NavigationTreeNodeType, EPathType>> =
{
streaming_query: EPathType.EPathTypeStreamingQuery,
};

export const mapPathTypeToNavigationTreeType = (
type: EPathType = EPathType.EPathTypeDir,
subType?: EPathSubType,
Expand Down
35 changes: 33 additions & 2 deletions src/containers/Tenant/utils/schemaActions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
TENANT_QUERY_TABS_ID,
} from '../../../store/reducers/tenant/constants';
import {setDiagnosticsTab, setQueryTab, setTenantPage} from '../../../store/reducers/tenant/tenant';
import type {IQueryResult} from '../../../types/store/query';
import createToast from '../../../utils/createToast';
import {insertSnippetToEditor} from '../../../utils/monaco/insertSnippet';
import {transformPath} from '../ObjectSummary/transformPath';
Expand All @@ -21,19 +22,23 @@ import type {TemplateFn} from './schemaQueryTemplates';
import {
addTableIndex,
alterAsyncReplicationTemplate,
alterStreamingQuerySettingsTemplate,
alterStreamingQueryText,
alterTableTemplate,
alterTopicTemplate,
alterTransferTemplate,
createAsyncReplicationTemplate,
createCdcStreamTemplate,
createColumnTableTemplate,
createExternalTableTemplate,
createStreamingQueryTemplate,
createTableTemplate,
createTopicTemplate,
createTransferTemplate,
createViewTemplate,
dropAsyncReplicationTemplate,
dropExternalTableTemplate,
dropStreamingQueryTemplate,
dropTableIndex,
dropTableTemplate,
dropTopicTemplate,
Expand All @@ -53,6 +58,8 @@ interface ActionsAdditionalParams {
schemaData?: SchemaData[];
isSchemaDataLoading?: boolean;
hasMonitoring?: boolean;
streamingQueryData?: IQueryResult;
isStreamingQueryTextLoading?: boolean;
}

interface BindActionParams {
Expand All @@ -74,6 +81,7 @@ const bindActions = (
getConfirmation,
getConnectToDBDialog,
schemaData,
streamingQueryData,
} = additionalEffects;

const inputQuery = (tmpl: TemplateFn) => () => {
Expand All @@ -82,7 +90,7 @@ const bindActions = (
dispatch(setTenantPage(TENANT_PAGES_IDS.query));
dispatch(setQueryTab(TENANT_QUERY_TABS_ID.newQuery));
setActivePath(params.path);
insertSnippetToEditor(tmpl({...params, schemaData}));
insertSnippetToEditor(tmpl({...params, schemaData, streamingQueryData}));
};
if (getConfirmation) {
const confirmedPromise = getConfirmation();
Expand Down Expand Up @@ -129,6 +137,10 @@ const bindActions = (
dropTopic: inputQuery(dropTopicTemplate),
createView: inputQuery(createViewTemplate),
dropView: inputQuery(dropViewTemplate),
createStreamingQuery: inputQuery(createStreamingQueryTemplate),
alterStreamingQuerySettings: inputQuery(alterStreamingQuerySettingsTemplate),
alterStreamingQueryText: inputQuery(alterStreamingQueryText),
dropStreamingQuery: inputQuery(dropStreamingQueryTemplate),
dropIndex: inputQuery(dropTableIndex),
addTableIndex: inputQuery(addTableIndex),
createCdcStream: inputQuery(createCdcStreamTemplate),
Expand Down Expand Up @@ -219,6 +231,7 @@ export const getActions =
},
{text: i18n('actions.createTopic'), action: actions.createTopic},
{text: i18n('actions.createView'), action: actions.createView},
{text: i18n('actions.createStreamingQuery'), action: actions.createStreamingQuery},
];

const alterTableGroupItem = {
Expand Down Expand Up @@ -334,6 +347,24 @@ export const getActions =
[copyItem, {text: i18n('actions.dropIndex'), action: actions.dropIndex}],
];

const STREAMING_QUERY_SET: ActionsSet = [
[copyItem],
[
{
text: i18n('actions.alterStreamingQuerySettings'),
action: actions.alterStreamingQuerySettings,
},
{
text: i18n('actions.alterStreamingQueryText'),
action: actions.alterStreamingQueryText,
},
{
text: i18n('actions.dropStreamingQuery'),
action: actions.dropStreamingQuery,
},
],
];

const JUST_COPY: ActionsSet = [copyItem];

// verbose mapping to guarantee a correct actions set for new node types
Expand Down Expand Up @@ -362,7 +393,7 @@ export const getActions =

view: VIEW_SET,

streaming_query: JUST_COPY,
streaming_query: STREAMING_QUERY_SET,
};

return nodeTypeToActions[type];
Expand Down
61 changes: 61 additions & 0 deletions src/containers/Tenant/utils/schemaQueryTemplates.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import type {IQueryResult} from '../../../types/store/query';
import {getStringifiedData} from '../../../utils/dataFormatters/dataFormatters';
import type {SchemaData} from '../Schema/SchemaViewer/types';

export interface SchemaQueryParams {
path: string;
relativePath: string;
schemaData?: SchemaData[];
streamingQueryData?: IQueryResult;
}

export type TemplateFn = (params?: SchemaQueryParams) => string;
Expand All @@ -12,6 +15,14 @@ function normalizeParameter(param: string) {
return param.replace(/\$/g, '\\$');
}

function toLF(s: string) {
return s.replace(/\r\n?/g, '\n');
}

function indentBlock(s: string, pad = ' ') {
return s.replace(/^/gm, pad);
}

export const createTableTemplate = (params?: SchemaQueryParams) => {
const tableName = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}/my_row_table\``
Expand Down Expand Up @@ -298,6 +309,56 @@ ALTER TRANSFER ${path}
SET USING \\$l;`;
};

export const createStreamingQueryTemplate = (params?: SchemaQueryParams) => {
const streamingQueryName = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}/my_streaming_query\``
: '${1:<my_streaming_query>}';
return `CREATE STREAMING QUERY ${streamingQueryName} WITH (
RUN = TRUE -- Run the query after creation
) AS
DO BEGIN
INSERT INTO \${2:<external data source>}.\${3:<sink topic>}
SELECT * FROM \${2:<external data source>}.\${4:<source topic>};
END DO;`;
};

export const alterStreamingQuerySettingsTemplate = (params?: SchemaQueryParams) => {
const streamingQueryName = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}\``
: '${1:<my_streaming_query>}';
return `ALTER STREAMING QUERY ${streamingQueryName} SET (
RUN = FALSE, -- Stop query execution
RESOURCE_POOL = "default" -- Workload manager pool for query
);`;
};

export const alterStreamingQueryText = (params?: SchemaQueryParams) => {
const streamingQueryName = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}\``
: '${1:<my_streaming_query>}';

const sysData = params?.streamingQueryData;
const rawQueryText = getStringifiedData(sysData?.resultSets?.[0]?.result?.[0]?.Text);
const normalizedQueryText = normalizeParameter(toLF(rawQueryText).trim());

const bodyQueryText = normalizedQueryText
? indentBlock(normalizedQueryText)
: '$2{{ current query text}}';
return `ALTER STREAMING QUERY ${streamingQueryName} SET (
FORCE = TRUE, -- Allow to drop last query checkpoint if query state can't be loaded
) AS
DO BEGIN
${bodyQueryText}
END DO;`;
};

export const dropStreamingQueryTemplate = (params?: SchemaQueryParams) => {
const streamingQueryName = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}\``
: '${1:<my_streaming_query>}';
return `DROP STREAMING QUERY ${streamingQueryName};`;
};

export const addTableIndex = (params?: SchemaQueryParams) => {
const path = params?.relativePath
? `\`${normalizeParameter(params.relativePath)}\``
Expand Down
46 changes: 46 additions & 0 deletions src/store/reducers/streamingQuery/streamingQuery.ts
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for this file, it is transferred from my previous PR - #3060

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {QUERY_TECHNICAL_MARK} from '../../../utils/constants';
import {isQueryErrorResponse, parseQueryAPIResponse} from '../../../utils/query';
import {api} from '../api';

function getStreamingQueryInfoSQL(path: string) {
const safePath = path.replace(/'/g, "''");
return `${QUERY_TECHNICAL_MARK}
SELECT
Status AS State,
Issues AS Error,
Text
FROM \`.sys/streaming_queries\`
WHERE Path = '${safePath}'
LIMIT 1`;
}

export const streamingQueriesApi = api.injectEndpoints({
endpoints: (build) => ({
getStreamingQueryInfo: build.query({
queryFn: async ({database, path}: {database: string; path: string}, {signal}) => {
try {
const response = await window.api.viewer.sendQuery(
{
query: getStreamingQueryInfoSQL(path),
database,
action: 'execute-scan',
internal_call: true,
},
{signal, withRetries: true},
);

if (isQueryErrorResponse(response)) {
return {error: response};
}

const data = parseQueryAPIResponse(response);
return {data};
} catch (error) {
return {error};
}
},
providesTags: ['All'],
}),
}),
overrideExisting: 'throw',
});
Loading