-
Notifications
You must be signed in to change notification settings - Fork 193
/
Copy pathmssqlqueries.py
186 lines (167 loc) · 4.95 KB
/
mssqlqueries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import sys
import re
def get_schemas():
"""
Query string to retrieve schema names.
:return: string
"""
sql = '''
SELECT name
FROM sys.schemas
ORDER BY 1'''
return normalize(sql)
def get_databases():
"""
Query string to retrieve all database names.
:return: string
"""
sql = '''
Select name
FROM sys.databases
ORDER BY 1'''
return normalize(sql)
def get_table_columns():
"""
Query string to retrieve all table columns.
:return: string
"""
sql = '''
SELECT isc.table_schema,
isc.table_name,
isc.column_name,
isc.data_type,
isc.column_default
FROM
(
SELECT table_schema,
table_name,
column_name,
data_type,
column_default
FROM INFORMATION_SCHEMA.COLUMNS
) AS isc
INNER JOIN
(
SELECT table_schema,
table_name
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
) AS ist
ON ist.table_name = isc.table_name AND ist.table_schema = isc.table_schema
ORDER BY 1, 2'''
return normalize(sql)
def get_view_columns():
"""
Query string to retrieve all view columns.
:return: string
"""
sql = '''
SELECT isc.table_schema,
isc.table_name,
isc.column_name,
isc.data_type,
isc.column_default
FROM
(
SELECT table_schema,
table_name,
column_name,
data_type,
column_default
FROM INFORMATION_SCHEMA.COLUMNS
) AS isc
INNER JOIN
(
SELECT table_schema,
table_name
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'VIEW'
) AS ist
ON ist.table_name = isc.table_name AND ist.table_schema = isc.table_schema
ORDER BY 1, 2'''
return normalize(sql)
def get_views():
"""
Query string to retrieve all views.
:return: string
"""
sql = '''
SELECT table_schema,
table_name
FROM INFORMATION_SCHEMA.VIEWS
ORDER BY 1, 2'''
return normalize(sql)
def get_tables():
"""
Query string to retrive all tables.
:return: string
"""
sql = '''
SELECT table_schema,
table_name
FROM INFORMATION_SCHEMA.TABLES
WHERE table_type = 'BASE TABLE'
ORDER BY 1, 2'''
return normalize(sql)
def get_user_defined_types():
"""
Query string to retrieve all user defined types.
:return: string
"""
sql = '''
SELECT schemas.name,
types.name
FROM
(
SELECT name,
schema_id
FROM sys.types
WHERE is_user_defined = 1) AS types
INNER JOIN
(
SELECT name,
schema_id
FROM sys.schemas) AS schemas
ON types.schema_id = schemas.schema_id'''
return normalize(sql)
def get_functions():
"""
Query string to retrieve stored procedures and functions.
:return: string
"""
sql = '''
SELECT specific_schema, specific_name
FROM INFORMATION_SCHEMA.ROUTINES
ORDER BY 1, 2'''
return normalize(sql)
def get_foreignkeys():
"""
Query string for returning foreign keys.
:return: string
"""
sql = '''
SELECT
kcu1.table_schema AS fk_table_schema,
kcu1.table_name AS fk_table_name,
kcu1.column_name AS fk_column_name,
kcu2.table_schema AS referenced_table_schema,
kcu2.table_name AS referenced_table_name,
kcu2.column_name AS referenced_column_name
FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS AS rc
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS kcu1
ON kcu1.constraint_catalog = rc.constraint_catalog
AND kcu1.constraint_schema = rc.constraint_schema
AND kcu1.constraint_name = rc.constraint_name
INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS kcu2
ON kcu2.constraint_catalog = rc.unique_constraint_catalog
AND kcu2.constraint_schema = rc.unique_constraint_schema
AND kcu2.constraint_name = rc.unique_constraint_name
AND kcu2.ordinal_position = kcu1.ordinal_position
ORDER BY 3, 4'''
return normalize(sql)
def normalize(sql):
if (sql == '' or sql is None):
return sql
sql = sql.replace('\r', ' ').replace('\n', ' ').strip()
sql = re.sub(r' +', ' ', sql)
return sql.decode('utf-8') if sys.version_info[0] < 3 else sql