Source code for sqlconvert.process_mysql
from sqlparse.sql import Comment, Function, Identifier, Parenthesis, \
Statement, Token
from sqlparse import tokens as T
from .process_tokens import escape_strings, is_comment_or_space
def _is_directive_token(token):
if isinstance(token, Comment):
subtokens = token.tokens
if subtokens:
comment = subtokens[0]
if comment.ttype is T.Comment.Multiline and \
comment.value.startswith('/*!'):
return True
return False
[docs]def is_directive_statement(statement):
tokens = statement.tokens
if not _is_directive_token(tokens[0]):
return False
if tokens[-1].ttype is not T.Punctuation or tokens[-1].value != ';':
return False
for token in tokens[1:-1]:
if token.ttype not in (T.Newline, T.Whitespace):
return False
return True
[docs]def remove_directive_tokens(statement):
"""Remove /\*! directives \*/ from the first-level"""
new_tokens = []
for token in statement.tokens:
if _is_directive_token(token):
continue
new_tokens.append(token)
statement.tokens = new_tokens
[docs]def requote_names(token_list):
"""Remove backticks, quote non-lowercase identifiers"""
for token in token_list.flatten():
if token.ttype is T.Name:
value = token.value
if (value[0] == "`") and (value[-1] == "`"):
value = value[1:-1]
if value.islower():
token.normalized = token.value = value
else:
token.normalized = token.value = '"%s"' % value
[docs]def unescape_strings(token_list):
"""Unescape strings"""
for token in token_list.flatten():
if token.ttype is T.String.Single:
value = token.value
for orig, repl in (
('\\"', '"'),
("\\'", "'"),
("''", "'"),
('\\b', '\b'),
('\\n', '\n'),
('\\r', '\r'),
('\\t', '\t'),
('\\\032', '\032'),
('\\\\', '\\'),
):
value = value.replace(orig, repl)
token.normalized = token.value = value
[docs]def is_insert(statement):
for token in statement.tokens:
if is_comment_or_space(token):
continue
return (token.ttype is T.DML) and (token.normalized == 'INSERT')
[docs]def split_ext_insert(statement):
"""Split extended INSERT into multiple standard INSERTs"""
insert_tokens = []
values_tokens = []
end_tokens = []
expected = 'INSERT'
for token in statement.tokens:
if is_comment_or_space(token):
if expected == 'END':
end_tokens.append(token)
else:
insert_tokens.append(token)
continue
elif expected == 'INSERT':
if (token.ttype is T.DML) and (token.normalized == 'INSERT'):
insert_tokens.append(token)
expected = 'INTO'
continue
elif expected == 'INTO':
if (token.ttype is T.Keyword) and (token.normalized == 'INTO'):
insert_tokens.append(token)
expected = 'TABLE_NAME'
continue
elif expected == 'TABLE_NAME':
if isinstance(token, (Function, Identifier)):
insert_tokens.append(token)
expected = 'VALUES'
continue
elif expected == 'VALUES':
if (token.ttype is T.Keyword) and (token.normalized == 'VALUES'):
insert_tokens.append(token)
expected = 'VALUES_OR_SEMICOLON'
continue
elif expected == 'VALUES_OR_SEMICOLON':
if isinstance(token, Parenthesis):
values_tokens.append(token)
continue
elif token.ttype is T.Punctuation:
if token.value == ',':
continue
elif token.value == ';':
end_tokens.append(token)
expected = 'END'
continue
raise ValueError(
'SQL syntax error: expected "%s", got %s "%s"' % (
expected, token.ttype, token.normalized))
new_line = Token(T.Newline, '\n')
new_lines = [new_line] # Insert newlines between split statements
for i, values in enumerate(values_tokens):
if i == len(values_tokens) - 1: # Last but one statement
# Insert newlines only between split statements but not after
new_lines = []
# The statemnt sets `parent` attribute of the every token to self
# but we don't care.
statement = Statement(insert_tokens + [values] +
end_tokens + new_lines)
yield statement
[docs]def process_statement(statement, quoting_style='sqlite'):
requote_names(statement)
unescape_strings(statement)
remove_directive_tokens(statement)
escape_strings(statement, quoting_style)
if is_insert(statement):
for statement in split_ext_insert(statement):
yield statement
else:
yield statement