Adding the south schema migration app (http://south.aeracode.org/).

- Legacy-Id: 2118
This commit is contained in:
Henrik Levkowetz 2010-03-20 21:32:07 +00:00
parent b95fe436e2
commit 58a6dc0b85
60 changed files with 6003 additions and 0 deletions

View file

@ -123,6 +123,7 @@ INSTALLED_APPS = (
'django.contrib.sitemaps',
'django.contrib.admin',
'django.contrib.humanize',
'south',
'ietf.announcements',
'ietf.idindex',
'ietf.idtracker',

1
south/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

6
south/__init__.py Normal file
View file

@ -0,0 +1,6 @@
"""
South - Useable migrations for Django apps
"""
__version__ = "0.6.2"
__authors__ = ["Andrew Godwin <andrew@aeracode.org>", "Andy McCurdy <andy@andymccurdy.com>"]

1
south/db/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

17
south/db/__init__.py Normal file
View file

@ -0,0 +1,17 @@
# Establish the common DatabaseOperations instance, which we call 'db'.
# This code somewhat lifted from django evolution
from django.conf import settings
import sys
if hasattr(settings, "SOUTH_DATABASE_ADAPTER"):
module_name = settings.SOUTH_DATABASE_ADAPTER
else:
module_name = '.'.join(['south.db', settings.DATABASE_ENGINE])
try:
module = __import__(module_name,{},{},[''])
except ImportError:
sys.stderr.write("There is no South database module for the engine '%s' (tried with %s). Please either choose a supported one, or check for SOUTH_DATABASE_ADAPTER settings, or remove South from INSTALLED_APPS.\n"
% (settings.DATABASE_ENGINE, module_name))
sys.exit(1)
db = module.DatabaseOperations()

770
south/db/generic.py Normal file
View file

@ -0,0 +1,770 @@
import datetime
import string
import random
import re
import sys
from django.core.management.color import no_style
from django.db import connection, transaction, models
from django.db.backends.util import truncate_name
from django.db.models.fields import NOT_PROVIDED
from django.dispatch import dispatcher
from django.conf import settings
from django.utils.datastructures import SortedDict
from south.logger import get_logger
def alias(attrname):
"""
Returns a function which calls 'attrname' - for function aliasing.
We can't just use foo = bar, as this breaks subclassing.
"""
def func(self, *args, **kwds):
return getattr(self, attrname)(*args, **kwds)
return func
class DatabaseOperations(object):
"""
Generic SQL implementation of the DatabaseOperations.
Some of this code comes from Django Evolution.
"""
# We assume the generic DB can handle DDL transactions. MySQL wil change this.
has_ddl_transactions = True
alter_string_set_type = 'ALTER COLUMN %(column)s TYPE %(type)s'
alter_string_set_null = 'ALTER COLUMN %(column)s DROP NOT NULL'
alter_string_drop_null = 'ALTER COLUMN %(column)s SET NOT NULL'
has_check_constraints = True
delete_check_sql = 'ALTER TABLE %(table)s DROP CONSTRAINT %(constraint)s'
allows_combined_alters = True
add_column_string = 'ALTER TABLE %s ADD COLUMN %s;'
delete_unique_sql = "ALTER TABLE %s DROP CONSTRAINT %s"
delete_foreign_key_sql = 'ALTER TABLE %s DROP CONSTRAINT %s'
supports_foreign_keys = True
max_index_name_length = 63
drop_index_string = 'DROP INDEX %(index_name)s'
delete_column_string = 'ALTER TABLE %s DROP COLUMN %s CASCADE;'
create_primary_key_string = "ALTER TABLE %(table)s ADD CONSTRAINT %(constraint)s PRIMARY KEY (%(columns)s)"
drop_primary_key_string = "ALTER TABLE %(table)s DROP CONSTRAINT %(constraint)s"
backend_name = None
def __init__(self):
self.debug = False
self.deferred_sql = []
self.dry_run = False
self.pending_transactions = 0
self.pending_create_signals = []
def connection_init(self):
"""
Run before any SQL to let database-specific config be sent as a command,
e.g. which storage engine (MySQL) or transaction serialisability level.
"""
pass
def execute(self, sql, params=[]):
"""
Executes the given SQL statement, with optional parameters.
If the instance's debug attribute is True, prints out what it executes.
"""
self.connection_init()
cursor = connection.cursor()
if self.debug:
print " = %s" % sql, params
get_logger().debug('south execute "%s" with params "%s"' % (sql, params))
if self.dry_run:
return []
cursor.execute(sql, params)
try:
return cursor.fetchall()
except:
return []
def execute_many(self, sql, regex=r"(?mx) ([^';]* (?:'[^']*'[^';]*)*)", comment_regex=r"(?mx) (?:^\s*$)|(?:--.*$)"):
"""
Takes a SQL file and executes it as many separate statements.
(Some backends, such as Postgres, don't work otherwise.)
"""
# Be warned: This function is full of dark magic. Make sure you really
# know regexes before trying to edit it.
# First, strip comments
sql = "\n".join([x.strip().replace("%", "%%") for x in re.split(comment_regex, sql) if x.strip()])
# Now execute each statement
for st in re.split(regex, sql)[1:][::2]:
self.execute(st)
def add_deferred_sql(self, sql):
"""
Add a SQL statement to the deferred list, that won't be executed until
this instance's execute_deferred_sql method is run.
"""
self.deferred_sql.append(sql)
def execute_deferred_sql(self):
"""
Executes all deferred SQL, resetting the deferred_sql list
"""
for sql in self.deferred_sql:
self.execute(sql)
self.deferred_sql = []
def clear_deferred_sql(self):
"""
Resets the deferred_sql list to empty.
"""
self.deferred_sql = []
def clear_run_data(self, pending_creates = None):
"""
Resets variables to how they should be before a run. Used for dry runs.
If you want, pass in an old panding_creates to reset to.
"""
self.clear_deferred_sql()
self.pending_create_signals = pending_creates or []
def get_pending_creates(self):
return self.pending_create_signals
def create_table(self, table_name, fields):
"""
Creates the table 'table_name'. 'fields' is a tuple of fields,
each repsented by a 2-part tuple of field name and a
django.db.models.fields.Field object
"""
qn = connection.ops.quote_name
# allow fields to be a dictionary
# removed for now - philosophical reasons (this is almost certainly not what you want)
#try:
# fields = fields.items()
#except AttributeError:
# pass
if len(table_name) > 63:
print " ! WARNING: You have a table name longer than 63 characters; this will not fully work on PostgreSQL or MySQL."
columns = [
self.column_sql(table_name, field_name, field)
for field_name, field in fields
]
self.execute('CREATE TABLE %s (%s);' % (qn(table_name), ', '.join([col for col in columns if col])))
add_table = alias('create_table') # Alias for consistency's sake
def rename_table(self, old_table_name, table_name):
"""
Renames the table 'old_table_name' to 'table_name'.
"""
if old_table_name == table_name:
# No Operation
return
qn = connection.ops.quote_name
params = (qn(old_table_name), qn(table_name))
self.execute('ALTER TABLE %s RENAME TO %s;' % params)
def delete_table(self, table_name, cascade=True):
"""
Deletes the table 'table_name'.
"""
qn = connection.ops.quote_name
params = (qn(table_name), )
if cascade:
self.execute('DROP TABLE %s CASCADE;' % params)
else:
self.execute('DROP TABLE %s;' % params)
drop_table = alias('delete_table')
def clear_table(self, table_name):
"""
Deletes all rows from 'table_name'.
"""
qn = connection.ops.quote_name
params = (qn(table_name), )
self.execute('DELETE FROM %s;' % params)
def add_column(self, table_name, name, field, keep_default=True):
"""
Adds the column 'name' to the table 'table_name'.
Uses the 'field' paramater, a django.db.models.fields.Field instance,
to generate the necessary sql
@param table_name: The name of the table to add the column to
@param name: The name of the column to add
@param field: The field to use
"""
qn = connection.ops.quote_name
sql = self.column_sql(table_name, name, field)
if sql:
params = (
qn(table_name),
sql,
)
sql = self.add_column_string % params
self.execute(sql)
# Now, drop the default if we need to
if not keep_default and field.default is not None:
field.default = NOT_PROVIDED
self.alter_column(table_name, name, field, explicit_name=False)
def _db_type_for_alter_column(self, field):
"""
Returns a field's type suitable for ALTER COLUMN.
By default it just returns field.db_type().
To be overriden by backend specific subclasses
@param field: The field to generate type for
"""
return field.db_type()
def alter_column(self, table_name, name, field, explicit_name=True):
"""
Alters the given column name so it will match the given field.
Note that conversion between the two by the database must be possible.
Will not automatically add _id by default; to have this behavour, pass
explicit_name=False.
@param table_name: The name of the table to add the column to
@param name: The name of the column to alter
@param field: The new field definition to use
"""
# hook for the field to do any resolution prior to it's attributes being queried
if hasattr(field, 'south_init'):
field.south_init()
qn = connection.ops.quote_name
# Add _id or whatever if we need to
field.set_attributes_from_name(name)
if not explicit_name:
name = field.column
# Drop all check constraints. TODO: Add the right ones back.
if self.has_check_constraints:
check_constraints = self._constraints_affecting_columns(table_name, [name], "CHECK")
for constraint in check_constraints:
self.execute(self.delete_check_sql % {'table': qn(table_name), 'constraint': qn(constraint)})
# First, change the type
params = {
"column": qn(name),
"type": self._db_type_for_alter_column(field)
}
# SQLs is a list of (SQL, values) pairs.
sqls = [(self.alter_string_set_type % params, [])]
# Next, set any default
if not field.null and field.has_default():
default = field.get_default()
sqls.append(('ALTER COLUMN %s SET DEFAULT %%s ' % (qn(name),), [default]))
else:
sqls.append(('ALTER COLUMN %s DROP DEFAULT' % (qn(name),), []))
# Next, nullity
params = {
"column": qn(name),
"type": field.db_type(),
}
if field.null:
sqls.append((self.alter_string_set_null % params, []))
else:
sqls.append((self.alter_string_drop_null % params, []))
# TODO: Unique
if self.allows_combined_alters:
sqls, values = zip(*sqls)
self.execute(
"ALTER TABLE %s %s;" % (qn(table_name), ", ".join(sqls)),
flatten(values),
)
else:
# Databases like e.g. MySQL don't like more than one alter at once.
for sql, values in sqls:
self.execute("ALTER TABLE %s %s;" % (qn(table_name), sql), values)
def _constraints_affecting_columns(self, table_name, columns, type="UNIQUE"):
"""
Gets the names of the constraints affecting the given columns.
"""
if self.dry_run:
raise ValueError("Cannot get constraints for columns during a dry run.")
columns = set(columns)
if type == "CHECK":
ifsc_table = "constraint_column_usage"
else:
ifsc_table = "key_column_usage"
# First, load all constraint->col mappings for this table.
rows = self.execute("""
SELECT kc.constraint_name, kc.column_name
FROM information_schema.%s AS kc
JOIN information_schema.table_constraints AS c ON
kc.table_schema = c.table_schema AND
kc.table_name = c.table_name AND
kc.constraint_name = c.constraint_name
WHERE
kc.table_schema = %%s AND
kc.table_name = %%s AND
c.constraint_type = %%s
""" % ifsc_table, ['public', table_name, type])
# Load into a dict
mapping = {}
for constraint, column in rows:
mapping.setdefault(constraint, set())
mapping[constraint].add(column)
# Find ones affecting these columns
for constraint, itscols in mapping.items():
if itscols == columns:
yield constraint
def create_unique(self, table_name, columns):
"""
Creates a UNIQUE constraint on the columns on the given table.
"""
qn = connection.ops.quote_name
if not isinstance(columns, (list, tuple)):
columns = [columns]
name = self.create_index_name(table_name, columns, suffix="_uniq")
cols = ", ".join(map(qn, columns))
self.execute("ALTER TABLE %s ADD CONSTRAINT %s UNIQUE (%s)" % (qn(table_name), qn(name), cols))
return name
def delete_unique(self, table_name, columns):
"""
Deletes a UNIQUE constraint on precisely the columns on the given table.
"""
qn = connection.ops.quote_name
if not isinstance(columns, (list, tuple)):
columns = [columns]
# Dry runs mean we can't do anything.
if self.dry_run:
return
constraints = list(self._constraints_affecting_columns(table_name, columns))
if not constraints:
raise ValueError("Cannot find a UNIQUE constraint on table %s, columns %r" % (table_name, columns))
for constraint in constraints:
self.execute(self.delete_unique_sql % (qn(table_name), qn(constraint)))
def column_sql(self, table_name, field_name, field, tablespace=''):
"""
Creates the SQL snippet for a column. Used by add_column and add_table.
"""
qn = connection.ops.quote_name
field.set_attributes_from_name(field_name)
# hook for the field to do any resolution prior to it's attributes being queried
if hasattr(field, 'south_init'):
field.south_init()
# Possible hook to fiddle with the fields (e.g. defaults & TEXT on MySQL)
field = self._field_sanity(field)
sql = field.db_type()
if sql:
field_output = [qn(field.column), sql]
field_output.append('%sNULL' % (not field.null and 'NOT ' or ''))
if field.primary_key:
field_output.append('PRIMARY KEY')
elif field.unique:
# Just use UNIQUE (no indexes any more, we have delete_unique)
field_output.append('UNIQUE')
tablespace = field.db_tablespace or tablespace
if tablespace and connection.features.supports_tablespaces and field.unique:
# We must specify the index tablespace inline, because we
# won't be generating a CREATE INDEX statement for this field.
field_output.append(connection.ops.tablespace_sql(tablespace, inline=True))
sql = ' '.join(field_output)
sqlparams = ()
# if the field is "NOT NULL" and a default value is provided, create the column with it
# this allows the addition of a NOT NULL field to a table with existing rows
if not field.null and not getattr(field, '_suppress_default', False) and field.has_default():
default = field.get_default()
# If the default is actually None, don't add a default term
if default is not None:
# If the default is a callable, then call it!
if callable(default):
default = default()
# Now do some very cheap quoting. TODO: Redesign return values to avoid this.
if isinstance(default, basestring):
default = "'%s'" % default.replace("'", "''")
elif isinstance(default, (datetime.date, datetime.time, datetime.datetime)):
default = "'%s'" % default
sql += " DEFAULT %s"
sqlparams = (default)
elif (not field.null and field.blank) or ((field.get_default() == '') and (not getattr(field, '_suppress_default', False))):
if field.empty_strings_allowed and connection.features.interprets_empty_strings_as_nulls:
sql += " DEFAULT ''"
# Error here would be nice, but doesn't seem to play fair.
#else:
# raise ValueError("Attempting to add a non null column that isn't character based without an explicit default value.")
if field.rel and self.supports_foreign_keys:
self.add_deferred_sql(
self.foreign_key_sql(
table_name,
field.column,
field.rel.to._meta.db_table,
field.rel.to._meta.get_field(field.rel.field_name).column
)
)
if field.db_index and not field.unique:
self.add_deferred_sql(self.create_index_sql(table_name, [field.column]))
if hasattr(field, 'post_create_sql'):
style = no_style()
for stmt in field.post_create_sql(style, table_name):
self.add_deferred_sql(stmt)
if sql:
return sql % sqlparams
else:
return None
def _field_sanity(self, field):
"""
Placeholder for DBMS-specific field alterations (some combos aren't valid,
e.g. DEFAULT and TEXT on MySQL)
"""
return field
def foreign_key_sql(self, from_table_name, from_column_name, to_table_name, to_column_name):
"""
Generates a full SQL statement to add a foreign key constraint
"""
qn = connection.ops.quote_name
constraint_name = '%s_refs_%s_%x' % (from_column_name, to_column_name, abs(hash((from_table_name, to_table_name))))
return 'ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s)%s;' % (
qn(from_table_name),
qn(truncate_name(constraint_name, connection.ops.max_name_length())),
qn(from_column_name),
qn(to_table_name),
qn(to_column_name),
connection.ops.deferrable_sql() # Django knows this
)
def delete_foreign_key(self, table_name, column):
"Drop a foreign key constraint"
qn = connection.ops.quote_name
if self.dry_run:
return # We can't look at the DB to get the constraints
constraints = list(self._constraints_affecting_columns(table_name, [column], "FOREIGN KEY"))
if not constraints:
raise ValueError("Cannot find a FOREIGN KEY constraint on table %s, column %s" % (table_name, column))
for constraint_name in constraints:
self.execute(self.delete_foreign_key_sql % (qn(table_name), qn(constraint_name)))
drop_foreign_key = alias('delete_foreign_key')
def create_index_name(self, table_name, column_names, suffix=""):
"""
Generate a unique name for the index
"""
index_unique_name = ''
if len(column_names) > 1:
index_unique_name = '_%x' % abs(hash((table_name, ','.join(column_names))))
# If the index name is too long, truncate it
index_name = ('%s_%s%s%s' % (table_name, column_names[0], index_unique_name, suffix))
if len(index_name) > self.max_index_name_length:
part = ('_%s%s%s' % (column_names[0], index_unique_name, suffix))
index_name = '%s%s' % (table_name[:(self.max_index_name_length-len(part))], part)
return index_name
def create_index_sql(self, table_name, column_names, unique=False, db_tablespace=''):
"""
Generates a create index statement on 'table_name' for a list of 'column_names'
"""
qn = connection.ops.quote_name
if not column_names:
print "No column names supplied on which to create an index"
return ''
if db_tablespace and connection.features.supports_tablespaces:
tablespace_sql = ' ' + connection.ops.tablespace_sql(db_tablespace)
else:
tablespace_sql = ''
index_name = self.create_index_name(table_name, column_names)
qn = connection.ops.quote_name
return 'CREATE %sINDEX %s ON %s (%s)%s;' % (
unique and 'UNIQUE ' or '',
qn(index_name),
qn(table_name),
','.join([qn(field) for field in column_names]),
tablespace_sql
)
def create_index(self, table_name, column_names, unique=False, db_tablespace=''):
""" Executes a create index statement """
sql = self.create_index_sql(table_name, column_names, unique, db_tablespace)
self.execute(sql)
def delete_index(self, table_name, column_names, db_tablespace=''):
"""
Deletes an index created with create_index.
This is possible using only columns due to the deterministic
index naming function which relies on column names.
"""
if isinstance(column_names, (str, unicode)):
column_names = [column_names]
name = self.create_index_name(table_name, column_names)
qn = connection.ops.quote_name
sql = self.drop_index_string % {"index_name": qn(name), "table_name": qn(table_name)}
self.execute(sql)
drop_index = alias('delete_index')
def delete_column(self, table_name, name):
"""
Deletes the column 'column_name' from the table 'table_name'.
"""
qn = connection.ops.quote_name
params = (qn(table_name), qn(name))
self.execute(self.delete_column_string % params, [])
drop_column = alias('delete_column')
def rename_column(self, table_name, old, new):
"""
Renames the column 'old' from the table 'table_name' to 'new'.
"""
raise NotImplementedError("rename_column has no generic SQL syntax")
def drop_primary_key(self, table_name):
"""
Drops the old primary key.
"""
qn = connection.ops.quote_name
self.execute(self.drop_primary_key_string % {
"table": qn(table_name),
"constraint": qn(table_name+"_pkey"),
})
delete_primary_key = alias('drop_primary_key')
def create_primary_key(self, table_name, columns):
"""
Creates a new primary key on the specified columns.
"""
if not isinstance(columns, (list, tuple)):
columns = [columns]
qn = connection.ops.quote_name
self.execute(self.create_primary_key_string % {
"table": qn(table_name),
"constraint": qn(table_name+"_pkey"),
"columns": ", ".join(map(qn, columns)),
})
def start_transaction(self):
"""
Makes sure the following commands are inside a transaction.
Must be followed by a (commit|rollback)_transaction call.
"""
if self.dry_run:
self.pending_transactions += 1
transaction.commit_unless_managed()
transaction.enter_transaction_management()
transaction.managed(True)
def commit_transaction(self):
"""
Commits the current transaction.
Must be preceded by a start_transaction call.
"""
if self.dry_run:
return
transaction.commit()
transaction.leave_transaction_management()
def rollback_transaction(self):
"""
Rolls back the current transaction.
Must be preceded by a start_transaction call.
"""
if self.dry_run:
self.pending_transactions -= 1
transaction.rollback()
transaction.leave_transaction_management()
def rollback_transactions_dry_run(self):
"""
Rolls back all pending_transactions during this dry run.
"""
if not self.dry_run:
return
while self.pending_transactions > 0:
self.rollback_transaction()
if transaction.is_dirty():
# Force an exception, if we're still in a dirty transaction.
# This means we are missing a COMMIT/ROLLBACK.
transaction.leave_transaction_management()
def send_create_signal(self, app_label, model_names):
self.pending_create_signals.append((app_label, model_names))
def send_pending_create_signals(self):
# Group app_labels together
signals = SortedDict()
for (app_label, model_names) in self.pending_create_signals:
try:
signals[app_label].extend(model_names)
except KeyError:
signals[app_label] = list(model_names)
# Send only one signal per app.
for (app_label, model_names) in signals.iteritems():
self.really_send_create_signal(app_label, list(set(model_names)))
self.pending_create_signals = []
def really_send_create_signal(self, app_label, model_names):
"""
Sends a post_syncdb signal for the model specified.
If the model is not found (perhaps it's been deleted?),
no signal is sent.
TODO: The behavior of django.contrib.* apps seems flawed in that
they don't respect created_models. Rather, they blindly execute
over all models within the app sending the signal. This is a
patch we should push Django to make For now, this should work.
"""
if self.debug:
print " - Sending post_syncdb signal for %s: %s" % (app_label, model_names)
app = models.get_app(app_label)
if not app:
return
created_models = []
for model_name in model_names:
model = models.get_model(app_label, model_name)
if model:
created_models.append(model)
if created_models:
# syncdb defaults -- perhaps take these as options?
verbosity = 1
interactive = True
if hasattr(dispatcher, "send"):
dispatcher.send(signal=models.signals.post_syncdb, sender=app,
app=app, created_models=created_models,
verbosity=verbosity, interactive=interactive)
else:
models.signals.post_syncdb.send(sender=app,
app=app, created_models=created_models,
verbosity=verbosity, interactive=interactive)
def mock_model(self, model_name, db_table, db_tablespace='',
pk_field_name='id', pk_field_type=models.AutoField,
pk_field_args=[], pk_field_kwargs={}):
"""
Generates a MockModel class that provides enough information
to be used by a foreign key/many-to-many relationship.
Migrations should prefer to use these rather than actual models
as models could get deleted over time, but these can remain in
migration files forever.
Depreciated.
"""
class MockOptions(object):
def __init__(self):
self.db_table = db_table
self.db_tablespace = db_tablespace or settings.DEFAULT_TABLESPACE
self.object_name = model_name
self.module_name = model_name.lower()
if pk_field_type == models.AutoField:
pk_field_kwargs['primary_key'] = True
self.pk = pk_field_type(*pk_field_args, **pk_field_kwargs)
self.pk.set_attributes_from_name(pk_field_name)
self.abstract = False
def get_field_by_name(self, field_name):
# we only care about the pk field
return (self.pk, self.model, True, False)
def get_field(self, name):
# we only care about the pk field
return self.pk
class MockModel(object):
_meta = None
# We need to return an actual class object here, not an instance
MockModel._meta = MockOptions()
MockModel._meta.model = MockModel
return MockModel
# Single-level flattening of lists
def flatten(ls):
nl = []
for l in ls:
nl += l
return nl

147
south/db/mysql.py Normal file
View file

@ -0,0 +1,147 @@
from django.db import connection
from django.conf import settings
from south.db import generic
class DatabaseOperations(generic.DatabaseOperations):
"""
MySQL implementation of database operations.
"""
backend_name = "mysql"
alter_string_set_type = ''
alter_string_set_null = 'MODIFY %(column)s %(type)s NULL;'
alter_string_drop_null = 'MODIFY %(column)s %(type)s NOT NULL;'
drop_index_string = 'DROP INDEX %(index_name)s ON %(table_name)s'
drop_primary_key_string = "ALTER TABLE %(table)s DROP PRIMARY KEY"
allows_combined_alters = False
has_ddl_transactions = False
has_check_constraints = False
delete_unique_sql = "ALTER TABLE %s DROP INDEX %s"
def connection_init(self):
"""
Run before any SQL to let database-specific config be sent as a command,
e.g. which storage engine (MySQL) or transaction serialisability level.
"""
if hasattr(settings, "DATABASE_STORAGE_ENGINE") and \
settings.DATABASE_STORAGE_ENGINE:
cursor = connection.cursor()
cursor.execute("SET storage_engine=%s;" % settings.DATABASE_STORAGE_ENGINE)
def rename_column(self, table_name, old, new):
if old == new or self.dry_run:
return []
qn = connection.ops.quote_name
rows = [x for x in self.execute('DESCRIBE %s' % (qn(table_name),)) if x[0] == old]
if not rows:
raise ValueError("No column '%s' in '%s'." % (old, table_name))
params = (
qn(table_name),
qn(old),
qn(new),
rows[0][1],
rows[0][2] == "YES" and "NULL" or "NOT NULL",
rows[0][4] and "DEFAULT " or "",
rows[0][4] and "%s" or "",
rows[0][5] or "",
)
sql = 'ALTER TABLE %s CHANGE COLUMN %s %s %s %s %s %s %s;' % params
if rows[0][4]:
self.execute(sql, (rows[0][4],))
else:
self.execute(sql)
def delete_column(self, table_name, name):
qn = connection.ops.quote_name
db_name = settings.DATABASE_NAME
# See if there is a foreign key on this column
cursor = connection.cursor()
get_fkeyname_query = "SELECT tc.constraint_name FROM \
information_schema.table_constraints tc, \
information_schema.key_column_usage kcu \
WHERE tc.table_name=kcu.table_name \
AND tc.table_schema=kcu.table_schema \
AND tc.constraint_name=kcu.constraint_name \
AND tc.constraint_type='FOREIGN KEY' \
AND tc.table_schema='%s' \
AND tc.table_name='%s' \
AND kcu.column_name='%s'"
result = cursor.execute(get_fkeyname_query % (db_name, table_name, name))
# if a foreign key exists, we need to delete it first
if result > 0:
assert result == 1 #we should only have one result
fkey_name = cursor.fetchone()[0]
drop_query = "ALTER TABLE %s DROP FOREIGN KEY %s"
cursor.execute(drop_query % (qn(table_name), qn(fkey_name)))
super(DatabaseOperations, self).delete_column(table_name, name)
def rename_table(self, old_table_name, table_name):
"""
Renames the table 'old_table_name' to 'table_name'.
"""
if old_table_name == table_name:
# No Operation
return
qn = connection.ops.quote_name
params = (qn(old_table_name), qn(table_name))
self.execute('RENAME TABLE %s TO %s;' % params)
def _constraints_affecting_columns(self, table_name, columns, type="UNIQUE"):
"""
Gets the names of the constraints affecting the given columns.
"""
if self.dry_run:
raise ValueError("Cannot get constraints for columns during a dry run.")
columns = set(columns)
db_name = settings.DATABASE_NAME
# First, load all constraint->col mappings for this table.
rows = self.execute("""
SELECT kc.constraint_name, kc.column_name
FROM information_schema.key_column_usage AS kc
JOIN information_schema.table_constraints AS c ON
kc.table_schema = c.table_schema AND
kc.table_name = c.table_name AND
kc.constraint_name = c.constraint_name
WHERE
kc.table_schema = %s AND
kc.table_catalog IS NULL AND
kc.table_name = %s AND
c.constraint_type = %s
""", [db_name, table_name, type])
# Load into a dict
mapping = {}
for constraint, column in rows:
mapping.setdefault(constraint, set())
mapping[constraint].add(column)
# Find ones affecting these columns
for constraint, itscols in mapping.items():
if itscols == columns:
yield constraint
def _field_sanity(self, field):
"""
This particular override stops us sending DEFAULTs for BLOB/TEXT columns.
"""
if field.db_type().upper() in ["BLOB", "TEXT", "LONGTEXT"]:
field._suppress_default = True
return field

View file

@ -0,0 +1,64 @@
from django.db import connection, models
from south.db import generic
class DatabaseOperations(generic.DatabaseOperations):
"""
PsycoPG2 implementation of database operations.
"""
backend_name = "postgres"
def rename_column(self, table_name, old, new):
if old == new:
return []
qn = connection.ops.quote_name
params = (qn(table_name), qn(old), qn(new))
self.execute('ALTER TABLE %s RENAME COLUMN %s TO %s;' % params)
def rename_table(self, old_table_name, table_name):
"will rename the table and an associated ID sequence and primary key index"
# First, rename the table
generic.DatabaseOperations.rename_table(self, old_table_name, table_name)
# Then, try renaming the ID sequence
# (if you're using other AutoFields... your problem, unfortunately)
self.commit_transaction()
self.start_transaction()
try:
generic.DatabaseOperations.rename_table(self, old_table_name+"_id_seq", table_name+"_id_seq")
except:
if self.debug:
print " ~ No such sequence (ignoring error)"
self.rollback_transaction()
else:
self.commit_transaction()
self.start_transaction()
# Rename primary key index, will not rename other indices on
# the table that are used by django (e.g. foreign keys). Until
# figure out how, you need to do this yourself.
try:
generic.DatabaseOperations.rename_table(self, old_table_name+"_pkey", table_name+ "_pkey")
except:
if self.debug:
print " ~ No such primary key (ignoring error)"
self.rollback_transaction()
else:
self.commit_transaction()
self.start_transaction()
def rename_index(self, old_index_name, index_name):
"Rename an index individually"
generic.DatabaseOperations.rename_table(self, old_index_name, index_name)
def _db_type_for_alter_column(self, field):
"""
Returns a field's type suitable for ALTER COLUMN.
Strips CHECKs from PositiveSmallIntegerField) and PositiveIntegerField
@param field: The field to generate type for
"""
if isinstance(field, models.PositiveSmallIntegerField) or isinstance(field, models.PositiveIntegerField):
return field.db_type().split(" ")[0]
return super(DatabaseOperations, self)._db_type_for_alter_column(field)

1
south/db/sql_server/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

View file

View file

@ -0,0 +1,148 @@
from django.db import connection
from django.db.models.fields import *
from south.db import generic
class DatabaseOperations(generic.DatabaseOperations):
"""
django-pyodbc (sql_server.pyodbc) implementation of database operations.
"""
backend_name = "pyodbc"
add_column_string = 'ALTER TABLE %s ADD %s;'
alter_string_set_type = 'ALTER COLUMN %(column)s %(type)s'
alter_string_drop_null = 'ALTER COLUMN %(column)s %(type)s NOT NULL'
allows_combined_alters = False
drop_index_string = 'DROP INDEX %(index_name)s ON %(table_name)s'
drop_constraint_string = 'ALTER TABLE %(table_name)s DROP CONSTRAINT %(constraint_name)s'
delete_column_string = 'ALTER TABLE %s DROP COLUMN %s'
def delete_column(self, table_name, name):
qn = connection.ops.quote_name
q_table_name, q_name = (qn(table_name), qn(name))
# Zap the indexes
for ind in self._find_indexes_for_column(table_name,name):
params = {'table_name':q_table_name, 'index_name': ind}
sql = self.drop_index_string % params
self.execute(sql, [])
# Zap the constraints
for const in self._find_constraints_for_column(table_name,name):
params = {'table_name':q_table_name, 'constraint_name': const}
sql = self.drop_constraint_string % params
self.execute(sql, [])
# Finally zap the column itself
self.execute(self.delete_column_string % (q_table_name, q_name), [])
def _find_indexes_for_column(self, table_name, name):
"Find the indexes that apply to a column, needed when deleting"
qn = connection.ops.quote_name
q_table_name, q_name = (qn(table_name), qn(name))
sql = """
SELECT si.name, si.id, sik.colid, sc.name
FROM dbo.sysindexes SI WITH (NOLOCK)
INNER JOIN dbo.sysindexkeys SIK WITH (NOLOCK)
ON SIK.id = Si.id
AND SIK.indid = SI.indid
INNER JOIN dbo.syscolumns SC WITH (NOLOCK)
ON SI.id = SC.id
AND SIK.colid = SC.colid
WHERE SI.indid !=0
AND Si.id = OBJECT_ID('%s')
AND SC.name = '%s'
"""
idx = self.execute(sql % (table_name, name), [])
return [i[0] for i in idx]
def _find_constraints_for_column(self, table_name, name):
"Find the constraints that apply to a column, needed when deleting"
qn = connection.ops.quote_name
q_table_name, q_name = (qn(table_name), qn(name))
sql = """
SELECT
Cons.xtype,
Cons.id,
Cons.[name]
FROM dbo.sysobjects AS Cons WITH(NOLOCK)
INNER JOIN (
SELECT [id], colid, name
FROM dbo.syscolumns WITH(NOLOCK)
WHERE id = OBJECT_ID('%s')
AND name = '%s'
) AS Cols
ON Cons.parent_obj = Cols.id
WHERE Cons.parent_obj = OBJECT_ID('%s')
AND (
(OBJECTPROPERTY(Cons.[id],'IsConstraint') = 1
AND Cons.info = Cols.colid)
OR (OBJECTPROPERTY(Cons.[id],'IsForeignKey') = 1
AND LEFT(Cons.name,%d) = '%s')
)
"""
cons = self.execute(sql % (table_name, name, table_name, len(name), name), [])
return [c[2] for c in cons]
def drop_column_default_sql(self, table_name, name, q_name):
"MSSQL specific drop default, which is a pain"
sql = """
SELECT object_name(cdefault)
FROM syscolumns
WHERE id = object_id('%s')
AND name = '%s'
"""
cons = self.execute(sql % (table_name, name), [])
if cons and cons[0] and cons[0][0]:
return "DROP CONSTRAINT %s" % cons[0][0]
return None
def _fix_field_definition(self, field):
if isinstance(field, BooleanField):
if field.default == True:
field.default = 1
if field.default == False:
field.default = 0
def add_column(self, table_name, name, field, keep_default=True):
self._fix_field_definition(field)
generic.DatabaseOperations.add_column(self, table_name, name, field, keep_default)
def create_table(self, table_name, fields):
# Tweak stuff as needed
for name,f in fields:
self._fix_field_definition(f)
# Run
generic.DatabaseOperations.create_table(self, table_name, fields)
def rename_column(self, table_name, old, new):
"""
Renames the column of 'table_name' from 'old' to 'new'.
WARNING - This isn't transactional on MSSQL!
"""
if old == new:
# No Operation
return
# Examples on the MS site show the table name not being quoted...
qn = connection.ops.quote_name
params = (table_name,qn(old), qn(new))
self.execute("EXEC sp_rename '%s.%s', %s, 'COLUMN'" % params)
def rename_table(self, old_table_name, table_name):
"""
Renames the table 'old_table_name' to 'table_name'.
WARNING - This isn't transactional on MSSQL!
"""
if old_table_name == table_name:
# No Operation
return
qn = connection.ops.quote_name
params = (qn(old_table_name), qn(table_name))
self.execute('EXEC sp_rename %s, %s' % params)

225
south/db/sqlite3.py Normal file
View file

@ -0,0 +1,225 @@
import inspect
import re
from django.db import connection
from django.db.models import ForeignKey
from south.db import generic
# from how .schema works as shown on http://www.sqlite.org/sqlite.html
GET_TABLE_DEF_SQL = """
SELECT sql FROM
(SELECT * FROM sqlite_master UNION ALL
SELECT * FROM sqlite_temp_master)
WHERE tbl_name LIKE '%s'
AND type!='meta' AND sql NOT NULL AND name NOT LIKE 'sqlite_%%%%'
ORDER BY substr(type,2,1), name;"""
class DatabaseOperations(generic.DatabaseOperations):
"""
SQLite3 implementation of database operations.
"""
backend_name = "sqlite3"
# SQLite ignores foreign key constraints. I wish I could.
supports_foreign_keys = False
defered_alters = {}
def __init__(self):
super(DatabaseOperations, self).__init__()
# holds fields defintions gotten from the sql schema. the key is the table name and then
# it's a list of 2 item lists. the two items in the list are fieldname, sql definition
self._fields = {}
def _populate_current_structure(self, table_name, force=False):
# get if we don't have it already or are being forced to refresh it
if force or not table_name in self._fields.keys():
cursor = connection.cursor()
cursor.execute(GET_TABLE_DEF_SQL % table_name)
create_table = cursor.fetchall()[0][0]
first = create_table.find('(')
last = create_table.rfind(')')
# rip out the CREATE TABLE xxx ( ) and only get the field definitions plus
# add the trailing comma to make the next part easier
fields_part = create_table[first+1: last] + ','
# pull out the field name and definition for each field
self._fields[table_name] = re.findall(r'"(\S+?)"(.*?),', fields_part, re.DOTALL)
def _rebuild_table(self, table_name, new_fields):
"""
rebuilds the table using the new definitions. only one change
can be made per call and it must be either a rename, alter or
delete
"""
self._populate_current_structure(table_name)
current_fields = self._fields[table_name]
temp_table_name = '%s_temp' % table_name
operation = None
changed_field = None
if len(current_fields) != len(new_fields):
if len(current_fields) - len(new_fields) != 1:
raise ValueError('only one field can be deleted at a time, found %s missing fields' % str(len(current_fields) - len(new_fields)))
operation = 'delete'
current_field_names = [f[0] for f in current_fields]
new_field_names = [f[0] for f in new_fields]
# find the deleted field
for f in current_field_names:
if not f in new_field_names:
changed_field = f
break
else:
found = False
for current, new in zip(current_fields, new_fields):
if current[0] != new[0]:
if found:
raise ValueError('can only handle one change per call, found more than one')
operation = 'rename'
changed_field = (current[0], new[0])
found = True
elif current[1] != new[1]:
if found:
raise ValueError('can only handle one change per call, found more than one')
operation = 'alter'
changed_field = current[0]
found = True
if not found:
raise ValueError('no changed found')
# create new table as temp
create = 'CREATE TABLE "%s" ( %s )'
fields_sql = ','.join(['"%s" %s' % (f[0], f[1]) for f in new_fields])
sql = create % (temp_table_name, fields_sql)
cursor = connection.cursor()
cursor.execute(sql)
# copy over data
# rename, redef or delete?
if operation in ['rename', 'alter']:
sql = 'insert into %s select * from %s' % (temp_table_name, table_name)
elif operation == 'delete':
new_field_names = ','.join(['"%s"' % f[0] for f in new_fields])
sql = 'insert into %s select %s from %s' % (temp_table_name, new_field_names, table_name)
cursor.execute(sql)
# remove existing table
self.delete_table(table_name)
# rename new table
self.rename_table(temp_table_name, table_name)
# repopulate field info
self._populate_current_structure(table_name, force=True)
def _defer_alter_sqlite_table(self, table_name, field_renames={}):
table_renames = self.defered_alters.get(table_name, {})
table_renames.update(field_renames)
self.defered_alters[table_name] = table_renames
# You can't add UNIQUE columns with an ALTER TABLE.
def add_column(self, table_name, name, field, *args, **kwds):
# Run ALTER TABLE with no unique column
unique, field._unique, field.db_index = field.unique, False, False
# If it's not nullable, and has no default, raise an error (SQLite is picky)
if (not field.null and
(not field.has_default() or field.get_default() is None) and
not field.empty_strings_allowed):
raise ValueError("You cannot add a null=False column without a default value.")
# Don't try and drop the default, it'll fail
kwds['keep_default'] = True
generic.DatabaseOperations.add_column(self, table_name, name, field, *args, **kwds)
# If it _was_ unique, make an index on it.
if unique:
self.create_index(table_name, [field.column], unique=True)
def _alter_sqlite_table(self, table_name, field_renames={}):
# Detect the model for the given table name
model = None
for omodel in self.current_orm:
if omodel._meta.db_table == table_name:
model = omodel
if model is None:
raise ValueError("Cannot find ORM model for '%s'." % table_name)
temp_name = table_name + "_temporary_for_schema_change"
self.rename_table(table_name, temp_name)
fields = [(fld.name, fld) for fld in model._meta.fields]
self.create_table(table_name, fields)
columns = [fld.column for name, fld in fields]
self.copy_data(temp_name, table_name, columns, field_renames)
self.delete_table(temp_name, cascade=False)
def alter_column(self, table_name, name, field, explicit_name=True):
self._populate_current_structure(table_name)
new_fields = []
for field_name, field_def in self._fields[table_name]:
if field_name == name:
if isinstance(field, ForeignKey):
field_name = name[:-3] # exclude the _id when calling column_sql
else:
field_name = name
new_fields.append((name, self.column_sql(table_name, field_name, field)))
else:
new_fields.append((field_name, field_def))
self._rebuild_table(table_name, new_fields)
def delete_column(self, table_name, column_name):
self._populate_current_structure(table_name)
new_fields = []
for field_name, field_def in self._fields[table_name]:
if field_name != column_name:
new_fields.append((field_name, field_def))
self._rebuild_table(table_name, new_fields)
def rename_column(self, table_name, old, new):
self._populate_current_structure(table_name)
new_fields = []
for field_name, field_def in self._fields[table_name]:
if field_name == old:
new_fields.append((new, field_def))
else:
new_fields.append((field_name, field_def))
self._rebuild_table(table_name, new_fields)
# Nor unique creation
def create_unique(self, table_name, columns):
"""
Not supported under SQLite.
"""
print " ! WARNING: SQLite does not support adding unique constraints. Ignored."
# Nor unique deletion
def delete_unique(self, table_name, columns):
"""
Not supported under SQLite.
"""
print " ! WARNING: SQLite does not support removing unique constraints. Ignored."
# No cascades on deletes
def delete_table(self, table_name, cascade=True):
generic.DatabaseOperations.delete_table(self, table_name, False)
def copy_data(self, src, dst, fields, field_renames={}):
qn = connection.ops.quote_name
q_fields = [field for field in fields]
for old, new in field_renames.items():
q_fields[q_fields.index(new)] = "%s AS %s" % (old, qn(new))
sql = "INSERT INTO %s SELECT %s FROM %s;" % (qn(dst), ', '.join(q_fields), qn(src))
self.execute(sql)
def execute_deferred_sql(self):
"""
Executes all deferred SQL, resetting the deferred_sql list
"""
for table_name, params in self.defered_alters.items():
self._alter_sqlite_table(table_name, params)
self.defered_alters = {}
generic.DatabaseOperations.execute_deferred_sql(self)

1
south/hacks/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

10
south/hacks/__init__.py Normal file
View file

@ -0,0 +1,10 @@
"""
The hacks module encapsulates all the horrible things that play with Django
internals in one, evil place.
This top file will automagically expose the correct Hacks class.
"""
# Currently, these work for 1.0 and 1.1.
from south.hacks.django_1_0 import Hacks
hacks = Hacks()

71
south/hacks/django_1_0.py Normal file
View file

@ -0,0 +1,71 @@
"""
Hacks for the Django 1.0/1.0.2 releases.
"""
from django.conf import settings
from django.db import models
from django.db.models.loading import AppCache, cache
class Hacks:
def set_installed_apps(self, apps):
"""
Sets Django's INSTALLED_APPS setting to be effectively the list passed in.
"""
# Make sure it's a list.
apps = list(apps)
# This function will be monkeypatched into place.
def new_get_apps():
return apps
# Monkeypatch in!
models.get_apps_old, models.get_apps = models.get_apps, new_get_apps
settings.INSTALLED_APPS, settings.OLD_INSTALLED_APPS = (
apps,
settings.INSTALLED_APPS,
)
self._redo_app_cache()
def reset_installed_apps(self):
"""
Undoes the effect of set_installed_apps.
"""
models.get_apps = models.get_apps_old
settings.INSTALLED_APPS = settings.OLD_INSTALLED_APPS
self._redo_app_cache()
def _redo_app_cache(self):
"""
Used to repopulate AppCache after fiddling with INSTALLED_APPS.
"""
a = AppCache()
a.loaded = False
a._populate()
def clear_app_cache(self):
"""
Clears the contents of AppCache to a blank state, so new models
from the ORM can be added.
"""
self.old_app_models = cache.app_models
cache.app_models = {}
def unclear_app_cache(self):
"""
Reversed the effects of clear_app_cache.
"""
cache.app_models = self.old_app_models
def repopulate_app_cache(self):
"""
Rebuilds AppCache with the real model definitions.
"""
cache._populate()

View file

@ -0,0 +1 @@
/*.pyc

View file

@ -0,0 +1,6 @@
# This module contains built-in introspector plugins for various common
# Django apps.
# These imports trigger the lower-down files
import south.introspection_plugins.geodjango
import south.introspection_plugins.django_tagging

View file

@ -0,0 +1,19 @@
from south.modelsinspector import add_introspection_rules
try:
from tagging.fields import TagField
except ImportError:
pass
else:
rules = [
(
(TagField, ),
[],
{
"blank": ["blank", {"default": True}],
"max_length": ["max_length", {"default": 255}],
},
),
]
add_introspection_rules(rules, ["^tagging\.fields",])

View file

@ -0,0 +1,44 @@
"""
GeoDjango introspection rules
"""
import django
from django.conf import settings
from south.modelsinspector import add_introspection_rules
has_gis = "django.contrib.gis" in settings.INSTALLED_APPS
if has_gis:
# Alright,import the field
from django.contrib.gis.db.models.fields import GeometryField
# Make some introspection rules
if django.VERSION[0] == 1 and django.VERSION[1] >= 1:
# Django 1.1's gis module renamed these.
rules = [
(
(GeometryField, ),
[],
{
"srid": ["srid", {"default": 4326}],
"spatial_index": ["spatial_index", {"default": True}],
"dim": ["dim", {"default": 2}],
},
),
]
else:
rules = [
(
(GeometryField, ),
[],
{
"srid": ["_srid", {"default": 4326}],
"spatial_index": ["_index", {"default": True}],
"dim": ["_dim", {"default": 2}],
},
),
]
# Install them
add_introspection_rules(rules, ["^django\.contrib\.gis"])

26
south/logger.py Normal file
View file

@ -0,0 +1,26 @@
import sys
import logging
from django.conf import settings
class NullHandler(logging.Handler):
def emit(self, record):
pass
h = NullHandler()
_logger = logging.getLogger("south")
_logger.addHandler(h)
_logger.setLevel(logging.DEBUG)
# TODO: Add a log formatter?
def get_logger():
debug_on = getattr(settings, "SOUTH_LOGGING_ON", False)
logging_file = getattr(settings, "SOUTH_LOGGING_FILE", False)
if debug_on:
if logging_file:
_logger.addHandler( logging.FileHandler(logging_file) )
_logger.setLevel(logging.DEBUG)
else:
raise IOError, "SOUTH_LOGGING_ON is True. You also need a SOUTH_LOGGING_FILE setting."
return _logger

1
south/management/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

View file

1
south/management/commands/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

View file

View file

@ -0,0 +1,65 @@
from django.core.management.base import BaseCommand
from django.core.management.color import no_style
from django.conf import settings
from django.db import models
from django.core import management
from optparse import make_option
from django.core.exceptions import ImproperlyConfigured
from south.migration import get_app
from south.hacks import hacks
import sys
class Command(BaseCommand):
option_list = BaseCommand.option_list
if '--verbosity' not in [opt.get_opt_string() for opt in BaseCommand.option_list]:
option_list += (
make_option('--verbosity', action='store', dest='verbosity', default='1',
type='choice', choices=['0', '1', '2'],
help='Verbosity level; 0=minimal output, 1=normal output, 2=all output'),
)
help = "Quickly converts the named application to use South if it is currently using syncdb."
def handle(self, app=None, *args, **options):
# Make sure we have an app
if not app:
print "Please specify an app to convert."
return
# See if the app exists
app = app.split(".")[-1]
try:
app_module = models.get_app(app)
except ImproperlyConfigured:
print "There is no enabled application matching '%s'." % app
return
# Try to get its list of models
model_list = models.get_models(app_module)
if not model_list:
print "This application has no models; this command is for applications that already have models syncdb'd."
print "Make some models, and then use ./manage.py startmigration %s --initial instead." % app
return
# Ask South if it thinks it's already got migrations
if get_app(app_module):
print "This application is already managed by South."
return
# Finally! It seems we've got a candidate, so do the two-command trick
verbosity = int(options.get('verbosity', 0))
management.call_command("startmigration", app, initial=True, verbosity=verbosity)
# Now, we need to re-clean and sanitise appcache
hacks.clear_app_cache()
hacks.repopulate_app_cache()
# Now, migrate
management.call_command("migrate", app, "0001", fake=True, verbosity=verbosity)
print
print "App '%s' converted. Note that South assumed the application's models matched the database" % app
print "(i.e. you haven't changed it since last syncdb); if you have, you should delete the %s/migrations" % app
print "directory, revert models.py so it matches the database, and try again."

View file

@ -0,0 +1,120 @@
"""
Migrate management command.
"""
import sys
from optparse import make_option
from django.core.management.base import BaseCommand
from django.core.management.color import no_style
from django.conf import settings
from django.db import models
from south import migration
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option('--all', action='store_true', dest='all_apps', default=False,
help='Run the specified migration for all apps.'),
make_option('--list', action='store_true', dest='list', default=False,
help='List migrations noting those that have been applied'),
make_option('--skip', action='store_true', dest='skip', default=False,
help='Will skip over out-of-order missing migrations'),
make_option('--merge', action='store_true', dest='merge', default=False,
help='Will run out-of-order missing migrations as they are - no rollbacks.'),
make_option('--no-initial-data', action='store_true', dest='no_initial_data', default=False,
help='Skips loading initial data if specified.'),
make_option('--fake', action='store_true', dest='fake', default=False,
help="Pretends to do the migrations, but doesn't actually execute them."),
make_option('--db-dry-run', action='store_true', dest='db_dry_run', default=False,
help="Doesn't execute the SQL generated by the db methods, and doesn't store a record that the migration(s) occurred. Useful to test migrations before applying them."),
)
if '--verbosity' not in [opt.get_opt_string() for opt in BaseCommand.option_list]:
option_list += (
make_option('--verbosity', action='store', dest='verbosity', default='1',
type='choice', choices=['0', '1', '2'],
help='Verbosity level; 0=minimal output, 1=normal output, 2=all output'),
)
help = "Runs migrations for all apps."
args = "[appname] [migrationname|zero] [--all] [--list] [--skip] [--merge] [--no-initial-data] [--fake] [--db-dry-run]"
def handle(self, app=None, target=None, skip=False, merge=False, backwards=False, fake=False, db_dry_run=False, list=False, **options):
# Work out what the resolve mode is
resolve_mode = merge and "merge" or (skip and "skip" or None)
# NOTE: THIS IS DUPLICATED FROM django.core.management.commands.syncdb
# This code imports any module named 'management' in INSTALLED_APPS.
# The 'management' module is the preferred way of listening to post_syncdb
# signals, and since we're sending those out with create_table migrations,
# we need apps to behave correctly.
for app_name in settings.INSTALLED_APPS:
try:
__import__(app_name + '.management', {}, {}, [''])
except ImportError, exc:
msg = exc.args[0]
if not msg.startswith('No module named') or 'management' not in msg:
raise
# END DJANGO DUPE CODE
# if all_apps flag is set, shift app over to target
if options.get('all_apps', False):
target = app
app = None
# Migrate each app
if app:
apps = [migration.get_app(app.split(".")[-1])]
if apps == [None]:
print "The app '%s' does not appear to use migrations." % app
print "./manage.py migrate " + self.args
return
else:
apps = migration.get_migrated_apps()
if list and apps:
list_migrations(apps)
if not list:
tree = migration.dependency_tree()
for app in apps:
result = migration.migrate_app(
app,
tree,
resolve_mode = resolve_mode,
target_name = target,
fake = fake,
db_dry_run = db_dry_run,
verbosity = int(options.get('verbosity', 0)),
load_inital_data = not options.get('no_initial_data', False),
skip = skip,
)
if result is False:
return
def list_migrations(apps):
from south.models import MigrationHistory
apps = list(apps)
names = [migration.get_app_name(app) for app in apps]
applied_migrations = MigrationHistory.objects.filter(app_name__in=names)
applied_migrations = ['%s.%s' % (mi.app_name,mi.migration) for mi in applied_migrations]
print
for app in apps:
print migration.get_app_name(app)
all_migrations = migration.get_migration_names(app)
for migration_name in all_migrations:
long_form = '%s.%s' % (migration.get_app_name(app),migration_name)
if long_form in applied_migrations:
print format_migration_list_item(migration_name)
else:
print format_migration_list_item(migration_name, applied=False)
print
def format_migration_list_item(name, applied=True):
if applied:
return ' * %s' % name
return ' %s' % name

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,83 @@
from django.core.management.base import NoArgsCommand, BaseCommand
from django.core.management.color import no_style
from django.utils.datastructures import SortedDict
from optparse import make_option
from south import migration
from south.db import db
from django.core.management.commands import syncdb
from django.conf import settings
from django.db import models
from django.db.models.loading import cache
from django.core import management
import sys
def get_app_name(app):
return '.'.join( app.__name__.split('.')[0:-1] )
class Command(NoArgsCommand):
option_list = NoArgsCommand.option_list + (
make_option('--noinput', action='store_false', dest='interactive', default=True,
help='Tells Django to NOT prompt the user for input of any kind.'),
make_option('--migrate', action='store_true', dest='migrate', default=False,
help='Tells South to also perform migrations after the sync. Default for during testing, and other internal calls.'),
make_option('--all', action='store_true', dest='migrate_all', default=False,
help='Makes syncdb work on all apps, even migrated ones. Be careful!'),
)
if '--verbosity' not in [opt.get_opt_string() for opt in BaseCommand.option_list]:
option_list += (
make_option('--verbosity', action='store', dest='verbosity', default='1',
type='choice', choices=['0', '1', '2'],
help='Verbosity level; 0=minimal output, 1=normal output, 2=all output'),
)
help = "Create the database tables for all apps in INSTALLED_APPS whose tables haven't already been created, except those which use migrations."
def handle_noargs(self, migrate_all=False, **options):
# Work out what uses migrations and so doesn't need syncing
apps_needing_sync = []
apps_migrated = []
for app in models.get_apps():
app_name = get_app_name(app)
migrations = migration.get_app(app)
if migrations is None or migrate_all:
apps_needing_sync.append(app_name)
else:
# This is a migrated app, leave it
apps_migrated.append(app_name)
verbosity = int(options.get('verbosity', 0))
# Run syncdb on only the ones needed
if verbosity:
print "Syncing..."
old_installed, settings.INSTALLED_APPS = settings.INSTALLED_APPS, apps_needing_sync
old_app_store, cache.app_store = cache.app_store, SortedDict([
(k, v) for (k, v) in cache.app_store.items()
if get_app_name(k) in apps_needing_sync
])
# This will allow the setting of the MySQL storage engine, for example.
db.connection_init()
# OK, run the actual syncdb
syncdb.Command().execute(**options)
settings.INSTALLED_APPS = old_installed
cache.app_store = old_app_store
# Migrate if needed
if options.get('migrate', True):
if verbosity:
print "Migrating..."
management.call_command('migrate', **options)
# Be obvious about what we did
if verbosity:
print "\nSynced:\n > %s" % "\n > ".join(apps_needing_sync)
if options.get('migrate', True):
if verbosity:
print "\nMigrated:\n - %s" % "\n - ".join(apps_migrated)
else:
if verbosity:
print "\nNot synced (use migrations):\n - %s" % "\n - ".join(apps_migrated)
print "(use ./manage.py migrate to migrate these)"

View file

@ -0,0 +1,27 @@
from django.core import management
from django.core.management.commands import test
from django.core.management.commands import syncdb
from django.conf import settings
from syncdb import Command as SyncDbCommand
class MigrateAndSyncCommand(SyncDbCommand):
option_list = SyncDbCommand.option_list
for opt in option_list:
if "--migrate" == opt.get_opt_string():
opt.default = True
break
class Command(test.Command):
def handle(self, *args, **kwargs):
management.get_commands()
if not hasattr(settings, "SOUTH_TESTS_MIGRATE") or not settings.SOUTH_TESTS_MIGRATE:
# point at the core syncdb command when creating tests
# tests should always be up to date with the most recent model structure
management._commands['syncdb'] = 'django.core'
else:
management._commands['syncdb'] = MigrateAndSyncCommand()
super(Command, self).handle(*args, **kwargs)

View file

@ -0,0 +1,27 @@
from django.core import management
from django.core.management.commands import testserver
from django.core.management.commands import syncdb
from django.conf import settings
from syncdb import Command as SyncDbCommand
class MigrateAndSyncCommand(SyncDbCommand):
option_list = SyncDbCommand.option_list
for opt in option_list:
if "--migrate" == opt.get_opt_string():
opt.default = True
break
class Command(testserver.Command):
def handle(self, *args, **kwargs):
management.get_commands()
if not hasattr(settings, "SOUTH_TESTS_MIGRATE") or not settings.SOUTH_TESTS_MIGRATE:
# point at the core syncdb command when creating tests
# tests should always be up to date with the most recent model structure
management._commands['syncdb'] = 'django.core'
else:
management._commands['syncdb'] = MigrateAndSyncCommand()
super(Command, self).handle(*args, **kwargs)

612
south/migration.py Normal file
View file

@ -0,0 +1,612 @@
"""
Main migration logic.
"""
import datetime
import os
import sys
import traceback
import inspect
from django.conf import settings
from django.db import models
from django.core.exceptions import ImproperlyConfigured
from django.core.management import call_command
from south.models import MigrationHistory
from south.db import db
from south.orm import LazyFakeORM, FakeORM
from south.signals import *
def get_app(app):
"""
Returns the migrations module for the given app model name/module, or None
if it does not use migrations.
"""
if isinstance(app, (str, unicode)):
# If it's a string, use the models module
app = models.get_app(app)
mod = __import__(app.__name__[:-7], {}, {}, ['migrations'])
if hasattr(mod, 'migrations'):
return getattr(mod, 'migrations')
def get_migrated_apps():
"""
Returns all apps with migrations.
"""
for mapp in models.get_apps():
app = get_app(mapp)
if app:
yield app
def get_app_name(app):
"""
Returns the _internal_ app name for the given app module.
i.e. for <module django.contrib.auth.models> will return 'auth'
"""
return app.__name__.split('.')[-2]
def get_app_fullname(app):
"""
Returns the full python name of an app - e.g. django.contrib.auth
"""
return app.__name__[:-11]
def short_from_long(app_name):
return app_name.split(".")[-1]
def get_migration_names(app):
"""
Returns a list of migration file names for the given app.
"""
if getattr(settings, "SOUTH_USE_PYC", False):
allowed_extensions = (".pyc", ".py")
ignored_files = ("__init__.pyc", "__init__.py")
else:
allowed_extensions = (".py",)
ignored_files = ("__init__.py",)
return sorted(set([
os.path.splitext(filename)[0]
for filename in os.listdir(os.path.dirname(app.__file__))
if os.path.splitext(filename)[1] in allowed_extensions and filename not in ignored_files and not filename.startswith(".")
]))
def get_migration_classes(app):
"""
Returns a list of migration classes (one for each migration) for the app.
"""
for name in get_migration_names(app):
yield get_migration(app, name)
def get_migration(app, name):
"""
Returns the migration class implied by 'name'.
"""
try:
module = __import__(app.__name__ + "." + name, '', '', ['Migration'])
migclass = module.Migration
migclass.orm = LazyFakeORM(migclass, get_app_name(app))
module._ = lambda x: x # Fake i18n
module.datetime = datetime
return migclass
except ImportError:
print " ! Migration %s:%s probably doesn't exist." % (get_app_name(app), name)
print " - Traceback:"
raise
except Exception:
print "While loading migration '%s.%s':" % (get_app_name(app), name)
raise
def all_migrations():
return dict([
(app, dict([(name, get_migration(app, name)) for name in get_migration_names(app)]))
for app in get_migrated_apps()
])
def dependency_tree():
tree = all_migrations()
# Annotate tree with 'backwards edges'
for app, classes in tree.items():
for name, cls in classes.items():
if not hasattr(cls, "_dependency_parents"):
cls._dependency_parents = []
if not hasattr(cls, "_dependency_children"):
cls._dependency_children = []
# Get forwards dependencies
if hasattr(cls, "depends_on"):
for dapp, dname in cls.depends_on:
dapp = get_app(dapp)
if dapp not in tree:
print "Migration %s in app %s depends on unmigrated app %s." % (
name,
get_app_name(app),
dapp,
)
sys.exit(1)
if dname not in tree[dapp]:
print "Migration %s in app %s depends on nonexistent migration %s in app %s." % (
name,
get_app_name(app),
dname,
get_app_name(dapp),
)
sys.exit(1)
cls._dependency_parents.append((dapp, dname))
if not hasattr(tree[dapp][dname], "_dependency_children"):
tree[dapp][dname]._dependency_children = []
tree[dapp][dname]._dependency_children.append((app, name))
# Get backwards dependencies
if hasattr(cls, "needed_by"):
for dapp, dname in cls.needed_by:
dapp = get_app(dapp)
if dapp not in tree:
print "Migration %s in app %s claims to be needed by unmigrated app %s." % (
name,
get_app_name(app),
dapp,
)
sys.exit(1)
if dname not in tree[dapp]:
print "Migration %s in app %s claims to be needed by nonexistent migration %s in app %s." % (
name,
get_app_name(app),
dname,
get_app_name(dapp),
)
sys.exit(1)
cls._dependency_children.append((dapp, dname))
if not hasattr(tree[dapp][dname], "_dependency_parents"):
tree[dapp][dname]._dependency_parents = []
tree[dapp][dname]._dependency_parents.append((app, name))
# Sanity check whole tree
for app, classes in tree.items():
for name, cls in classes.items():
cls.dependencies = dependencies(tree, app, name)
return tree
def nice_trace(trace):
return " -> ".join([str((get_app_name(a), n)) for a, n in trace])
def dependencies(tree, app, name, trace=[]):
# Copy trace to stop pass-by-ref problems
trace = trace[:]
# Sanity check
for papp, pname in trace:
if app == papp:
if pname == name:
print "Found circular dependency: %s" % nice_trace(trace + [(app,name)])
sys.exit(1)
else:
# See if they depend in the same app the wrong way
migrations = get_migration_names(app)
if migrations.index(name) > migrations.index(pname):
print "Found a lower migration (%s) depending on a higher migration (%s) in the same app (%s)." % (pname, name, get_app_name(app))
print "Path: %s" % nice_trace(trace + [(app,name)])
sys.exit(1)
# Get the dependencies of a migration
deps = []
migration = tree[app][name]
for dapp, dname in migration._dependency_parents:
deps.extend(
dependencies(tree, dapp, dname, trace+[(app,name)])
)
return deps
def remove_duplicates(l):
m = []
for x in l:
if x not in m:
m.append(x)
return m
def needed_before_forwards(tree, app, name, sameapp=True):
"""
Returns a list of migrations that must be applied before (app, name),
in the order they should be applied.
Used to make sure a migration can be applied (and to help apply up to it).
"""
app_migrations = get_migration_names(app)
needed = []
if sameapp:
for aname in app_migrations[:app_migrations.index(name)]:
needed += needed_before_forwards(tree, app, aname, False)
needed += [(app, aname)]
for dapp, dname in tree[app][name]._dependency_parents:
needed += needed_before_forwards(tree, dapp, dname)
needed += [(dapp, dname)]
return remove_duplicates(needed)
def needed_before_backwards(tree, app, name, sameapp=True):
"""
Returns a list of migrations that must be unapplied before (app, name) is,
in the order they should be unapplied.
Used to make sure a migration can be unapplied (and to help unapply up to it).
"""
app_migrations = get_migration_names(app)
needed = []
if sameapp:
for aname in reversed(app_migrations[app_migrations.index(name)+1:]):
needed += needed_before_backwards(tree, app, aname, False)
needed += [(app, aname)]
for dapp, dname in tree[app][name]._dependency_children:
needed += needed_before_backwards(tree, dapp, dname)
needed += [(dapp, dname)]
return remove_duplicates(needed)
def run_migrations(toprint, torun, recorder, app, migrations, fake=False, db_dry_run=False, verbosity=0):
"""
Runs the specified migrations forwards/backwards, in order.
"""
for migration in migrations:
app_name = get_app_name(app)
if verbosity:
print toprint % (app_name, migration)
# Get migration class
klass = get_migration(app, migration)
# Find its predecessor, and attach the ORM from that as prev_orm.
all_names = get_migration_names(app)
idx = all_names.index(migration)
# First migration? The 'previous ORM' is empty.
if idx == 0:
klass.prev_orm = FakeORM(None, app)
else:
klass.prev_orm = get_migration(app, all_names[idx-1]).orm
# If this is a 'fake' migration, do nothing.
if fake:
if verbosity:
print " (faked)"
# OK, we should probably do something then.
else:
runfunc = getattr(klass(), torun)
args = inspect.getargspec(runfunc)
# Get the correct ORM.
if torun == "forwards":
orm = klass.orm
else:
orm = klass.prev_orm
db.current_orm = orm
# If the database doesn't support running DDL inside a transaction
# *cough*MySQL*cough* then do a dry run first.
if not db.has_ddl_transactions or db_dry_run:
if not (hasattr(klass, "no_dry_run") and klass.no_dry_run):
db.dry_run = True
# Only hide SQL if this is an automatic dry run.
if not db.has_ddl_transactions:
db.debug, old_debug = False, db.debug
pending_creates = db.get_pending_creates()
db.start_transaction()
try:
if len(args[0]) == 1: # They don't want an ORM param
runfunc()
else:
runfunc(orm)
db.rollback_transactions_dry_run()
except:
traceback.print_exc()
print " ! Error found during dry run of migration! Aborting."
return False
if not db.has_ddl_transactions:
db.debug = old_debug
db.clear_run_data(pending_creates)
db.dry_run = False
elif db_dry_run:
print " - Migration '%s' is marked for no-dry-run." % migration
# If they really wanted to dry-run, then quit!
if db_dry_run:
return
if db.has_ddl_transactions:
db.start_transaction()
try:
if len(args[0]) == 1: # They don't want an ORM param
runfunc()
else:
runfunc(orm)
db.execute_deferred_sql()
except:
if db.has_ddl_transactions:
db.rollback_transaction()
raise
else:
traceback.print_exc()
print " ! Error found during real run of migration! Aborting."
print
print " ! Since you have a database that does not support running"
print " ! schema-altering statements in transactions, we have had to"
print " ! leave it in an interim state between migrations."
if torun == "forwards":
print
print " ! You *might* be able to recover with:"
db.debug = db.dry_run = True
if len(args[0]) == 1:
klass().backwards()
else:
klass().backwards(klass.prev_orm)
print
print " ! The South developers regret this has happened, and would"
print " ! like to gently persuade you to consider a slightly"
print " ! easier-to-deal-with DBMS."
return False
else:
if db.has_ddl_transactions:
db.commit_transaction()
if not db_dry_run:
# Record us as having done this
recorder(app_name, migration)
if not fake:
# Send a signal saying it ran
# Actually, don't - we're implementing this properly in 0.7
#ran_migration.send(None, app=app_name, migration=migration, method=torun)
pass
def run_forwards(app, migrations, fake=False, db_dry_run=False, verbosity=0):
"""
Runs the specified migrations forwards, in order.
"""
def record(app_name, migration):
# Record us as having done this
record = MigrationHistory.for_migration(app_name, migration)
record.applied = datetime.datetime.utcnow()
record.save()
return run_migrations(
toprint = " > %s: %s",
torun = "forwards",
recorder = record,
app = app,
migrations = migrations,
fake = fake,
db_dry_run = db_dry_run,
verbosity = verbosity,
)
def run_backwards(app, migrations, ignore=[], fake=False, db_dry_run=False, verbosity=0):
"""
Runs the specified migrations backwards, in order, skipping those
migrations in 'ignore'.
"""
def record(app_name, migration):
# Record us as having not done this
record = MigrationHistory.for_migration(app_name, migration)
record.delete()
return run_migrations(
toprint = " < %s: %s",
torun = "backwards",
recorder = record,
app = app,
migrations = [x for x in migrations if x not in ignore],
fake = fake,
db_dry_run = db_dry_run,
verbosity = verbosity,
)
def right_side_of(x, y):
return left_side_of(reversed(x), reversed(y))
def left_side_of(x, y):
return list(y)[:len(x)] == list(x)
def forwards_problems(tree, forwards, done, verbosity=0):
problems = []
for app, name in forwards:
if (app, name) not in done:
for dapp, dname in needed_before_backwards(tree, app, name):
if (dapp, dname) in done:
print " ! Migration (%s, %s) should not have been applied before (%s, %s) but was." % (get_app_name(dapp), dname, get_app_name(app), name)
problems.append(((app, name), (dapp, dname)))
return problems
def backwards_problems(tree, backwards, done, verbosity=0):
problems = []
for app, name in backwards:
if (app, name) in done:
for dapp, dname in needed_before_forwards(tree, app, name):
if (dapp, dname) not in done:
print " ! Migration (%s, %s) should have been applied before (%s, %s) but wasn't." % (get_app_name(dapp), dname, get_app_name(app), name)
problems.append(((app, name), (dapp, dname)))
return problems
def migrate_app(app, tree, target_name=None, resolve_mode=None, fake=False, db_dry_run=False, yes=False, verbosity=0, load_inital_data=False, skip=False):
app_name = get_app_name(app)
verbosity = int(verbosity)
db.debug = (verbosity > 1)
# Fire off the pre-migrate signal
pre_migrate.send(None, app=app_name)
# Find out what delightful migrations we have
migrations = get_migration_names(app)
# If there aren't any, quit quizically
if not migrations:
print "? You have no migrations for the '%s' app. You might want some." % app_name
return
if target_name not in migrations and target_name not in ["zero", None]:
matches = [x for x in migrations if x.startswith(target_name)]
if len(matches) == 1:
target = migrations.index(matches[0]) + 1
if verbosity:
print " - Soft matched migration %s to %s." % (
target_name,
matches[0]
)
target_name = matches[0]
elif len(matches) > 1:
if verbosity:
print " - Prefix %s matches more than one migration:" % target_name
print " " + "\n ".join(matches)
return
else:
print " ! '%s' is not a migration." % target_name
return
# Check there's no strange ones in the database
ghost_migrations = []
for m in MigrationHistory.objects.filter(applied__isnull = False):
try:
if get_app(m.app_name) not in tree or m.migration not in tree[get_app(m.app_name)]:
ghost_migrations.append(m)
except ImproperlyConfigured:
pass
if ghost_migrations:
print " ! These migrations are in the database but not on disk:"
print " - " + "\n - ".join(["%s: %s" % (x.app_name, x.migration) for x in ghost_migrations])
print " ! I'm not trusting myself; fix this yourself by fiddling"
print " ! with the south_migrationhistory table."
return
# Say what we're doing
if verbosity:
print "Running migrations for %s:" % app_name
# Get the forwards and reverse dependencies for this target
if target_name == None:
target_name = migrations[-1]
if target_name == "zero":
forwards = []
backwards = needed_before_backwards(tree, app, migrations[0]) + [(app, migrations[0])]
else:
forwards = needed_before_forwards(tree, app, target_name) + [(app, target_name)]
# When migrating backwards we want to remove up to and including
# the next migration up in this app (not the next one, that includes other apps)
try:
migration_before_here = migrations[migrations.index(target_name)+1]
backwards = needed_before_backwards(tree, app, migration_before_here) + [(app, migration_before_here)]
except IndexError:
backwards = []
# Get the list of currently applied migrations from the db
current_migrations = []
for m in MigrationHistory.objects.filter(applied__isnull = False):
try:
current_migrations.append((get_app(m.app_name), m.migration))
except ImproperlyConfigured:
pass
direction = None
bad = False
# Work out the direction
applied_for_this_app = list(MigrationHistory.objects.filter(app_name=app_name, applied__isnull=False).order_by("migration"))
if target_name == "zero":
direction = -1
elif not applied_for_this_app:
direction = 1
elif migrations.index(target_name) > migrations.index(applied_for_this_app[-1].migration):
direction = 1
elif migrations.index(target_name) < migrations.index(applied_for_this_app[-1].migration):
direction = -1
else:
direction = None
# Is the whole forward branch applied?
missing = [step for step in forwards if step not in current_migrations]
# If they're all applied, we only know it's not backwards
if not missing:
direction = None
# If the remaining migrations are strictly a right segment of the forwards
# trace, we just need to go forwards to our target (and check for badness)
else:
problems = forwards_problems(tree, forwards, current_migrations, verbosity=verbosity)
if problems:
bad = True
direction = 1
# What about the whole backward trace then?
if not bad:
missing = [step for step in backwards if step not in current_migrations]
# If they're all missing, stick with the forwards decision
if missing == backwards:
pass
# If what's missing is a strict left segment of backwards (i.e.
# all the higher migrations) then we need to go backwards
else:
problems = backwards_problems(tree, backwards, current_migrations, verbosity=verbosity)
if problems:
bad = True
direction = -1
if bad and resolve_mode not in ['merge'] and not skip:
print " ! Inconsistent migration history"
print " ! The following options are available:"
print " --merge: will just attempt the migration ignoring any potential dependency conflicts."
sys.exit(1)
if direction == 1:
if verbosity:
print " - Migrating forwards to %s." % target_name
try:
for mapp, mname in forwards:
if (mapp, mname) not in current_migrations:
result = run_forwards(mapp, [mname], fake=fake, db_dry_run=db_dry_run, verbosity=verbosity)
if result is False: # The migrations errored, but nicely.
return False
finally:
# Call any pending post_syncdb signals
db.send_pending_create_signals()
# Now load initial data, only if we're really doing things and ended up at current
if not fake and not db_dry_run and load_inital_data and target_name == migrations[-1]:
if verbosity:
print " - Loading initial data for %s." % app_name
# Override Django's get_apps call temporarily to only load from the
# current app
old_get_apps, models.get_apps = (
models.get_apps,
lambda: [models.get_app(get_app_name(app))],
)
# Load the initial fixture
call_command('loaddata', 'initial_data', verbosity=verbosity)
# Un-override
models.get_apps = old_get_apps
elif direction == -1:
if verbosity:
print " - Migrating backwards to just after %s." % target_name
for mapp, mname in backwards:
if (mapp, mname) in current_migrations:
run_backwards(mapp, [mname], fake=fake, db_dry_run=db_dry_run, verbosity=verbosity)
else:
if verbosity:
print "- Nothing to migrate."
# Finally, fire off the post-migrate signal
post_migrate.send(None, app=app_name)

19
south/models.py Normal file
View file

@ -0,0 +1,19 @@
from django.db import models
class MigrationHistory(models.Model):
app_name = models.CharField(max_length=255)
migration = models.CharField(max_length=255)
applied = models.DateTimeField(blank=True, null=True)
@classmethod
def for_migration(cls, app_name, migration):
try:
return cls.objects.get(
app_name = app_name,
migration = migration,
)
except cls.DoesNotExist:
return cls(
app_name = app_name,
migration = migration,
)

322
south/modelsinspector.py Normal file
View file

@ -0,0 +1,322 @@
"""
Like south.modelsparser, but using introspection where possible
rather than direct inspection of models.py.
"""
import datetime
import re
import modelsparser
from south.utils import get_attribute
from django.db import models
from django.db.models.base import ModelBase, Model
from django.db.models.fields import NOT_PROVIDED
from django.conf import settings
from django.utils.functional import Promise
from django.contrib.contenttypes import generic
from django.utils.datastructures import SortedDict
NOISY = True
# Gives information about how to introspect certain fields.
# This is a list of triples; the first item is a list of fields it applies to,
# (note that isinstance is used, so superclasses are perfectly valid here)
# the second is a list of positional argument descriptors, and the third
# is a list of keyword argument descriptors.
# Descriptors are of the form:
# [attrname, options]
# Where attrname is the attribute on the field to get the value from, and options
# is an optional dict.
#
# The introspector uses the combination of all matching entries, in order.
introspection_details = [
(
(models.Field, ),
[],
{
"null": ["null", {"default": False}],
"blank": ["blank", {"default": False, "ignore_if":"primary_key"}],
"primary_key": ["primary_key", {"default": False}],
"max_length": ["max_length", {"default": None}],
"unique": ["_unique", {"default": False}],
"db_index": ["db_index", {"default": False}],
"default": ["default", {"default": NOT_PROVIDED}],
"db_column": ["db_column", {"default": None}],
"db_tablespace": ["db_tablespace", {"default": settings.DEFAULT_INDEX_TABLESPACE}],
},
),
(
(models.ForeignKey, models.OneToOneField),
[],
{
"to": ["rel.to", {}],
"to_field": ["rel.field_name", {"default_attr": "rel.to._meta.pk.name"}],
"related_name": ["rel.related_name", {"default": None}],
"db_index": ["db_index", {"default": True}],
},
),
(
(models.ManyToManyField,),
[],
{
"to": ["rel.to", {}],
"symmetrical": ["rel.symmetrical", {"default": True}],
},
),
(
(models.DateField, models.TimeField),
[],
{
"auto_now": ["auto_now", {"default": False}],
"auto_now_add": ["auto_now_add", {"default": False}],
},
),
(
(models.DecimalField, ),
[],
{
"max_digits": ["max_digits", {"default": None}],
"decimal_places": ["decimal_places", {"default": None}],
},
),
(
(models.BooleanField, ),
[],
{
"default": ["default", {"default": NOT_PROVIDED, "converter": bool}],
},
),
(
(models.FilePathField, ),
[],
{
"path": ["path", {"default": ''}],
"match": ["match", {"default": None}],
"recursive": ["recursive", {"default": False}],
},
),
(
(generic.GenericRelation, ),
[],
{
"to": ["rel.to", {}],
"symmetrical": ["rel.symmetrical", {"default": True}],
"object_id_field": ["object_id_field_name", {"default": "object_id"}],
"content_type_field": ["content_type_field_name", {"default": "content_type"}],
"blank": ["blank", {"default": True}],
},
),
]
# Regexes of allowed field full paths
allowed_fields = [
"^django\.db",
"^django\.contrib\.contenttypes\.generic",
"^django\.contrib\.localflavor",
]
# Similar, but for Meta, so just the inner level (kwds).
meta_details = {
"db_table": ["db_table", {"default_attr_concat": ["%s_%s", "app_label", "module_name"]}],
"db_tablespace": ["db_tablespace", {"default": settings.DEFAULT_TABLESPACE}],
"unique_together": ["unique_together", {"default": []}],
}
# 2.4 compatability
any = lambda x: reduce(lambda y, z: y or z, x, False)
def add_introspection_rules(rules=[], patterns=[]):
"Allows you to add some introspection rules at runtime, e.g. for 3rd party apps."
assert isinstance(rules, (list, tuple))
assert isinstance(patterns, (list, tuple))
allowed_fields.extend(patterns)
introspection_details.extend(rules)
def can_introspect(field):
"""
Returns True if we are allowed to introspect this field, False otherwise.
('allowed' means 'in core'. Custom fields can declare they are introspectable
by the default South rules by adding the attribute _south_introspects = True.)
"""
# Check for special attribute
if hasattr(field, "_south_introspects") and field._south_introspects:
return True
# Check it's an introspectable field
full_name = "%s.%s" % (field.__class__.__module__, field.__class__.__name__)
for regex in allowed_fields:
if re.match(regex, full_name):
return True
return False
def matching_details(field):
"""
Returns the union of all matching entries in introspection_details for the field.
"""
our_args = []
our_kwargs = {}
for classes, args, kwargs in introspection_details:
if any([isinstance(field, x) for x in classes]):
our_args.extend(args)
our_kwargs.update(kwargs)
return our_args, our_kwargs
class IsDefault(Exception):
"""
Exception for when a field contains its default value.
"""
def get_value(field, descriptor):
"""
Gets an attribute value from a Field instance and formats it.
"""
attrname, options = descriptor
value = get_attribute(field, attrname)
# Lazy-eval functions get eval'd.
if isinstance(value, Promise):
value = unicode(value)
# If the value is the same as the default, omit it for clarity
if "default" in options and value == options['default']:
raise IsDefault
# If there's an ignore_if, use it
if "ignore_if" in options:
if get_attribute(field, options['ignore_if']):
raise IsDefault
# Some default values need to be gotten from an attribute too.
if "default_attr" in options:
default_value = get_attribute(field, options['default_attr'])
if value == default_value:
raise IsDefault
# Some are made from a formatting string and several attrs (e.g. db_table)
if "default_attr_concat" in options:
format, attrs = options['default_attr_concat'][0], options['default_attr_concat'][1:]
default_value = format % tuple(map(lambda x: get_attribute(field, x), attrs))
if value == default_value:
raise IsDefault
# Callables get called.
if callable(value) and not isinstance(value, ModelBase):
# Datetime.datetime.now is special, as we can access it from the eval
# context (and because it changes all the time; people will file bugs otherwise).
if value == datetime.datetime.now:
return "datetime.datetime.now"
if value == datetime.datetime.utcnow:
return "datetime.datetime.utcnow"
if value == datetime.date.today:
return "datetime.date.today"
# All other callables get called.
value = value()
# Models get their own special repr()
if isinstance(value, ModelBase):
# If it's a proxy model, follow it back to its non-proxy parent
if getattr(value._meta, "proxy", False):
value = value._meta.proxy_for_model
return "orm['%s.%s']" % (value._meta.app_label, value._meta.object_name)
# As do model instances
if isinstance(value, Model):
return "orm['%s.%s'].objects.get(pk=%r)" % (value.__class__._meta.app_label, value.__class__._meta.object_name, value.pk)
# Now, apply the converter func if there is one
if "converter" in options:
value = options['converter'](value)
# Return the final value
return repr(value)
def introspector(field):
"""
Given a field, introspects its definition triple.
"""
arg_defs, kwarg_defs = matching_details(field)
args = []
kwargs = {}
# For each argument, use the descriptor to get the real value.
for defn in arg_defs:
try:
args.append(get_value(field, defn))
except IsDefault:
pass
for kwd, defn in kwarg_defs.items():
try:
kwargs[kwd] = get_value(field, defn)
except IsDefault:
pass
return args, kwargs
def get_model_fields(model, m2m=False):
"""
Given a model class, returns a dict of {field_name: field_triple} defs.
"""
field_defs = SortedDict()
inherited_fields = {}
# Go through all bases (that are themselves models, but not Model)
for base in model.__bases__:
if base != models.Model and issubclass(base, models.Model):
if not base._meta.abstract:
# Looks like we need their fields, Ma.
inherited_fields.update(get_model_fields(base))
# Now, ask the parser to have a look at this model too.
try:
parser_fields = modelsparser.get_model_fields(model, m2m) or {}
except (TypeError, IndentationError): # Almost certainly a not-real module
parser_fields = {}
# Now, go through all the fields and try to get their definition
source = model._meta.local_fields[:]
if m2m:
source += model._meta.local_many_to_many
for field in source:
# Does it define a south_field_triple method?
if hasattr(field, "south_field_triple"):
if NOISY:
print " ( Nativing field: %s" % field.name
field_defs[field.name] = field.south_field_triple()
# Can we introspect it?
elif can_introspect(field):
#if NOISY:
# print "Introspecting field: %s" % field.name
# Get the full field class path.
field_class = field.__class__.__module__ + "." + field.__class__.__name__
# Run this field through the introspector
args, kwargs = introspector(field)
# That's our definition!
field_defs[field.name] = (field_class, args, kwargs)
# Hmph. Is it parseable?
elif parser_fields.get(field.name, None):
if NOISY:
print " ( Parsing field: %s" % field.name
field_defs[field.name] = parser_fields[field.name]
# Shucks, no definition!
else:
if NOISY:
print " ( Nodefing field: %s" % field.name
field_defs[field.name] = None
return field_defs
def get_model_meta(model):
"""
Given a model class, will return the dict representing the Meta class.
"""
# Get the introspected attributes
meta_def = {}
for kwd, defn in meta_details.items():
try:
meta_def[kwd] = get_value(model._meta, defn)
except IsDefault:
pass
return meta_def
# Now, load the built-in South introspection plugins
import south.introspection_plugins

428
south/modelsparser.py Normal file
View file

@ -0,0 +1,428 @@
"""
Parsing module for models.py files. Extracts information in a more reliable
way than inspect + regexes.
Now only used as a fallback when introspection and the South custom hook both fail.
"""
import re
import inspect
import parser
import symbol
import token
import keyword
import datetime
from django.db import models
from django.contrib.contenttypes import generic
from django.utils.datastructures import SortedDict
from django.core.exceptions import ImproperlyConfigured
def name_that_thing(thing):
"Turns a symbol/token int into its name."
for name in dir(symbol):
if getattr(symbol, name) == thing:
return "symbol.%s" % name
for name in dir(token):
if getattr(token, name) == thing:
return "token.%s" % name
return str(thing)
def thing_that_name(name):
"Turns a name of a symbol/token into its integer value."
if name in dir(symbol):
return getattr(symbol, name)
if name in dir(token):
return getattr(token, name)
raise ValueError("Cannot convert '%s'" % name)
def prettyprint(tree, indent=0, omit_singles=False):
"Prettyprints the tree, with symbol/token names. For debugging."
if omit_singles and isinstance(tree, tuple) and len(tree) == 2:
return prettyprint(tree[1], indent, omit_singles)
if isinstance(tree, tuple):
return " (\n%s\n" % "".join([prettyprint(x, indent+1) for x in tree]) + \
(" " * indent) + ")"
elif isinstance(tree, int):
return (" " * indent) + name_that_thing(tree)
else:
return " " + repr(tree)
def isclass(obj):
"Simple test to see if something is a class."
return issubclass(type(obj), type)
def aliased_models(module):
"""
Given a models module, returns a dict mapping all alias imports of models
(e.g. import Foo as Bar) back to their original names. Bug #134.
"""
aliases = {}
for name, obj in module.__dict__.items():
if isclass(obj) and issubclass(obj, models.Model) and obj is not models.Model:
# Test to see if this has a different name to what it should
if name != obj._meta.object_name:
aliases[name] = obj._meta.object_name
return aliases
class STTree(object):
"A syntax tree wrapper class."
def __init__(self, tree):
self.tree = tree
def __eq__(self, other):
return other.tree == self.tree
def __hash__(self):
return hash(self.tree)
@property
def root(self):
return self.tree[0]
@property
def value(self):
return self.tree
def walk(self, recursive=True):
"""
Yields (symbol, subtree) for the entire subtree.
Comes out with node 1, node 1's children, node 2, etc.
"""
stack = [self.tree]
done_outer = False
while stack:
atree = stack.pop()
if isinstance(atree, tuple):
if done_outer:
yield atree[0], STTree(atree)
if recursive or not done_outer:
for bit in reversed(atree[1:]):
stack.append(bit)
done_outer = True
def flatten(self):
"Yields the tokens/symbols in the tree only, in order."
bits = []
for sym, subtree in self.walk():
if sym in token_map:
bits.append(sym)
elif sym == token.NAME:
bits.append(subtree.value)
elif sym == token.STRING:
bits.append(subtree.value)
elif sym == token.NUMBER:
bits.append(subtree.value)
return bits
def reform(self):
"Prints how the tree's input probably looked."
return reform(self.flatten())
def findAllType(self, ntype, recursive=True):
"Returns all nodes with the given type in the tree."
for symbol, subtree in self.walk(recursive=recursive):
if symbol == ntype:
yield subtree
def find(self, selector):
"""
Searches the syntax tree with a CSS-like selector syntax.
You can use things like 'suite simple_stmt', 'suite, simple_stmt'
or 'suite > simple_stmt'. Not guaranteed to return in order.
"""
# Split up the overall parts
patterns = [x.strip() for x in selector.split(",")]
results = []
for pattern in patterns:
# Split up the parts
parts = re.split(r'(?:[\s]|(>))+', pattern)
# Take the first part, use it for results
if parts[0] == "^":
subresults = [self]
else:
subresults = list(self.findAllType(thing_that_name(parts[0])))
recursive = True
# For each remaining part, do something
for part in parts[1:]:
if not subresults:
break
if part == ">":
recursive = False
elif not part:
pass
else:
thing = thing_that_name(part)
newresults = [
list(tree.findAllType(thing, recursive))
for tree in subresults
]
subresults = []
for stuff in newresults:
subresults.extend(stuff)
recursive = True
results.extend(subresults)
return results
def __str__(self):
return prettyprint(self.tree)
__repr__ = __str__
def get_model_tree(model):
# Get the source of the model's file
try:
source = inspect.getsource(model).replace("\r\n", "\n").replace("\r","\n") + "\n"
except IOError:
return None
tree = STTree(parser.suite(source).totuple())
# Now, we have to find it
for poss in tree.find("compound_stmt"):
if poss.value[1][0] == symbol.classdef and \
poss.value[1][2][1].lower() == model.__name__.lower():
# This is the tree
return poss
token_map = {
token.DOT: ".",
token.LPAR: "(",
token.RPAR: ")",
token.EQUAL: "=",
token.EQEQUAL: "==",
token.COMMA: ",",
token.LSQB: "[",
token.RSQB: "]",
token.AMPER: "&",
token.BACKQUOTE: "`",
token.CIRCUMFLEX: "^",
token.CIRCUMFLEXEQUAL: "^=",
token.COLON: ":",
token.DOUBLESLASH: "//",
token.DOUBLESLASHEQUAL: "//=",
token.DOUBLESTAR: "**",
token.DOUBLESLASHEQUAL: "**=",
token.GREATER: ">",
token.LESS: "<",
token.GREATEREQUAL: ">=",
token.LESSEQUAL: "<=",
token.LBRACE: "{",
token.RBRACE: "}",
token.SEMI: ";",
token.PLUS: "+",
token.MINUS: "-",
token.STAR: "*",
token.SLASH: "/",
token.VBAR: "|",
token.PERCENT: "%",
token.TILDE: "~",
token.AT: "@",
token.NOTEQUAL: "!=",
token.LEFTSHIFT: "<<",
token.RIGHTSHIFT: ">>",
token.LEFTSHIFTEQUAL: "<<=",
token.RIGHTSHIFTEQUAL: ">>=",
token.PLUSEQUAL: "+=",
token.MINEQUAL: "-=",
token.STAREQUAL: "*=",
token.SLASHEQUAL: "/=",
token.VBAREQUAL: "|=",
token.PERCENTEQUAL: "%=",
token.AMPEREQUAL: "&=",
}
def reform(bits):
"Returns the string that the list of tokens/symbols 'bits' represents"
output = ""
for bit in bits:
if bit in token_map:
output += token_map[bit]
elif bit[0] in [token.NAME, token.STRING, token.NUMBER]:
if keyword.iskeyword(bit[1]):
output += " %s " % bit[1]
else:
if bit[1] not in symbol.sym_name:
output += bit[1]
return output
def parse_arguments(argstr):
"""
Takes a string representing arguments and returns the positional and
keyword argument list and dict respectively.
All the entries in these are python source, except the dict keys.
"""
# Get the tree
tree = STTree(parser.suite(argstr).totuple())
# Initialise the lists
curr_kwd = None
args = []
kwds = {}
# Walk through, assigning things
testlists = tree.find("testlist")
for i, testlist in enumerate(testlists):
# BTW: A testlist is to the left or right of an =.
items = list(testlist.walk(recursive=False))
for j, item in enumerate(items):
if item[0] == symbol.test:
if curr_kwd:
kwds[curr_kwd] = item[1].reform()
curr_kwd = None
elif j == len(items)-1 and i != len(testlists)-1:
# Last item in a group must be a keyword, unless it's last overall
curr_kwd = item[1].reform()
else:
args.append(item[1].reform())
return args, kwds
def extract_field(tree):
# Collapses the tree and tries to parse it as a field def
bits = tree.flatten()
## Check it looks right:
# Second token should be equals
if len(bits) < 2 or bits[1] != token.EQUAL:
return
## Split into meaningful sections
name = bits[0][1]
declaration = bits[2:]
# Find the first LPAR; stuff before that is the class.
try:
lpar_at = declaration.index(token.LPAR)
except ValueError:
return
clsname = reform(declaration[:lpar_at])
# Now, inside that, find the last RPAR, and we'll take the stuff between
# them as the arguments
declaration.reverse()
rpar_at = (len(declaration) - 1) - declaration.index(token.RPAR)
declaration.reverse()
args = declaration[lpar_at+1:rpar_at]
# Now, extract the arguments as a list and dict
try:
args, kwargs = parse_arguments(reform(args))
except SyntaxError:
return
# OK, extract and reform it
return name, clsname, args, kwargs
def get_model_fields(model, m2m=False):
"""
Given a model class, will return the dict of name: field_constructor
mappings.
"""
tree = get_model_tree(model)
if tree is None:
return None
possible_field_defs = tree.find("^ > classdef > suite > stmt > simple_stmt > small_stmt > expr_stmt")
field_defs = {}
# Get aliases, ready for alias fixing (#134)
try:
aliases = aliased_models(models.get_app(model._meta.app_label))
except ImproperlyConfigured:
aliases = {}
# Go through all the found defns, and try to parse them
for pfd in possible_field_defs:
field = extract_field(pfd)
if field:
field_defs[field[0]] = field[1:]
inherited_fields = {}
# Go through all bases (that are themselves models, but not Model)
for base in model.__bases__:
if base != models.Model and issubclass(base, models.Model):
inherited_fields.update(get_model_fields(base, m2m))
# Now, go through all the fields and try to get their definition
source = model._meta.local_fields[:]
if m2m:
source += model._meta.local_many_to_many
fields = SortedDict()
for field in source:
# Get its name
fieldname = field.name
if isinstance(field, (models.related.RelatedObject, generic.GenericRel)):
continue
# Now, try to get the defn
if fieldname in field_defs:
fields[fieldname] = field_defs[fieldname]
# Try the South definition workaround?
elif hasattr(field, 'south_field_triple'):
fields[fieldname] = field.south_field_triple()
elif hasattr(field, 'south_field_definition'):
print "Your custom field %s provides the outdated south_field_definition method.\nPlease consider implementing south_field_triple too; it's more reliably evaluated." % field
fields[fieldname] = field.south_field_definition()
# Try a parent?
elif fieldname in inherited_fields:
fields[fieldname] = inherited_fields[fieldname]
# Is it a _ptr?
elif fieldname.endswith("_ptr"):
fields[fieldname] = ("models.OneToOneField", ["orm['%s.%s']" % (field.rel.to._meta.app_label, field.rel.to._meta.object_name)], {})
# Try a default for 'id'.
elif fieldname == "id":
fields[fieldname] = ("models.AutoField", [], {"primary_key": "True"})
else:
fields[fieldname] = None
# Now, try seeing if we can resolve the values of defaults, and fix aliases.
for field, defn in fields.items():
if not isinstance(defn, (list, tuple)):
continue # We don't have a defn for this one, or it's a string
# Fix aliases if we can (#134)
for i, arg in enumerate(defn[1]):
if arg in aliases:
defn[1][i] = aliases[arg]
# Fix defaults if we can
for arg, val in defn[2].items():
if arg in ['default']:
try:
# Evaluate it in a close-to-real fake model context
real_val = eval(val, __import__(model.__module__, {}, {}, ['']).__dict__, model.__dict__)
# If we can't resolve it, stick it in verbatim
except:
pass # TODO: Raise nice error here?
# Hm, OK, we got a value. Callables are not frozen (see #132, #135)
else:
if callable(real_val):
# HACK
# However, if it's datetime.now, etc., that's special
for datetime_key in datetime.datetime.__dict__.keys():
# No, you can't use __dict__.values. It's different.
dtm = getattr(datetime.datetime, datetime_key)
if real_val == dtm:
if not val.startswith("datetime.datetime"):
defn[2][arg] = "datetime." + val
break
else:
defn[2][arg] = repr(real_val)
return fields

365
south/orm.py Normal file
View file

@ -0,0 +1,365 @@
"""
South's fake ORM; lets you not have to write SQL inside migrations.
Roughly emulates the real Django ORM, to a point.
"""
import inspect
import datetime
from django.db import models
from django.db.models.loading import cache
from django.core.exceptions import ImproperlyConfigured
from south.db import db
from south.utils import ask_for_it_by_name
from south.hacks import hacks
class ModelsLocals(object):
"""
Custom dictionary-like class to be locals();
falls back to lowercase search for items that don't exist
(because we store model names as lowercase).
"""
def __init__(self, data):
self.data = data
def __getitem__(self, key):
try:
return self.data[key]
except KeyError:
return self.data[key.lower()]
# Stores already-created ORMs.
_orm_cache = {}
def FakeORM(*args):
"""
Creates a Fake Django ORM.
This is actually a memoised constructor; the real class is _FakeORM.
"""
if not args in _orm_cache:
_orm_cache[args] = _FakeORM(*args)
return _orm_cache[args]
class LazyFakeORM(object):
"""
In addition to memoising the ORM call, this function lazily generates them
for a Migration class. Assign the result of this to (for example)
.orm, and as soon as .orm is accessed the ORM will be created.
"""
def __init__(self, *args):
self._args = args
self.orm = None
def __get__(self, obj, type=None):
if not self.orm:
self.orm = FakeORM(*self._args)
return self.orm
class _FakeORM(object):
"""
Simulates the Django ORM at some point in time,
using a frozen definition on the Migration class.
"""
def __init__(self, cls, app):
self.default_app = app
self.cls = cls
# Try loading the models off the migration class; default to no models.
self.models = {}
try:
self.models_source = cls.models
except AttributeError:
return
# Start a 'new' AppCache
hacks.clear_app_cache()
# Now, make each model's data into a FakeModel
# We first make entries for each model that are just its name
# This allows us to have circular model dependency loops
model_names = []
for name, data in self.models_source.items():
# Make sure there's some kind of Meta
if "Meta" not in data:
data['Meta'] = {}
try:
app_name, model_name = name.split(".", 1)
except ValueError:
app_name = self.default_app
model_name = name
name = "%s.%s" % (app_name, model_name)
name = name.lower()
self.models[name] = name
model_names.append((name, app_name, model_name, data))
for name, app_name, model_name, data in model_names:
self.models[name] = self.make_model(app_name, model_name, data)
# And perform the second run to iron out any circular/backwards depends.
self.retry_failed_fields()
# Force evaluation of relations on the models now
for model in self.models.values():
model._meta.get_all_field_names()
# Reset AppCache
hacks.unclear_app_cache()
def __iter__(self):
return iter(self.models.values())
def __getattr__(self, key):
fullname = (self.default_app+"."+key).lower()
try:
return self.models[fullname]
except KeyError:
raise AttributeError("The model '%s' from the app '%s' is not available in this migration." % (key, self.default_app))
def __getitem__(self, key):
# Detect if they asked for a field on a model or not.
if ":" in key:
key, fname = key.split(":")
else:
fname = None
# Now, try getting the model
key = key.lower()
try:
model = self.models[key]
except KeyError:
try:
app, model = key.split(".", 1)
except ValueError:
raise KeyError("The model '%s' is not in appname.modelname format." % key)
else:
raise KeyError("The model '%s' from the app '%s' is not available in this migration." % (model, app))
# If they asked for a field, get it.
if fname:
return model._meta.get_field_by_name(fname)[0]
else:
return model
def eval_in_context(self, code, app, extra_imports={}):
"Evaluates the given code in the context of the migration file."
# Drag in the migration module's locals (hopefully including models.py)
fake_locals = dict(inspect.getmodule(self.cls).__dict__)
# Remove all models from that (i.e. from modern models.py), to stop pollution
for key, value in fake_locals.items():
if isinstance(value, type) and issubclass(value, models.Model) and hasattr(value, "_meta"):
del fake_locals[key]
# We add our models into the locals for the eval
fake_locals.update(dict([
(name.split(".")[-1], model)
for name, model in self.models.items()
]))
# Make sure the ones for this app override.
fake_locals.update(dict([
(name.split(".")[-1], model)
for name, model in self.models.items()
if name.split(".")[0] == app
]))
# Ourselves as orm, to allow non-fail cross-app referencing
fake_locals['orm'] = self
# And a fake _ function
fake_locals['_'] = lambda x: x
# Datetime; there should be no datetime direct accesses
fake_locals['datetime'] = datetime
# Now, go through the requested imports and import them.
for name, value in extra_imports.items():
# First, try getting it out of locals.
parts = value.split(".")
try:
obj = fake_locals[parts[0]]
for part in parts[1:]:
obj = getattr(obj, part)
except (KeyError, AttributeError):
pass
else:
fake_locals[name] = obj
continue
# OK, try to import it directly
try:
fake_locals[name] = ask_for_it_by_name(value)
except ImportError:
if name == "SouthFieldClass":
raise ValueError("Cannot import the required field '%s'" % value)
else:
print "WARNING: Cannot import '%s'" % value
# Use ModelsLocals to make lookups work right for CapitalisedModels
fake_locals = ModelsLocals(fake_locals)
return eval(code, globals(), fake_locals)
def make_meta(self, app, model, data, stub=False):
"Makes a Meta class out of a dict of eval-able arguments."
results = {'app_label': app}
for key, code in data.items():
# Some things we never want to use.
if key in ["_bases"]:
continue
# Some things we don't want with stubs.
if stub and key in ["order_with_respect_to"]:
continue
# OK, add it.
try:
results[key] = self.eval_in_context(code, app)
except (NameError, AttributeError), e:
raise ValueError("Cannot successfully create meta field '%s' for model '%s.%s': %s." % (
key, app, model, e
))
return type("Meta", tuple(), results)
def make_model(self, app, name, data):
"Makes a Model class out of the given app name, model name and pickled data."
# Extract any bases out of Meta
if "_bases" in data['Meta']:
bases = data['Meta']['_bases']
else:
bases = ['django.db.models.Model']
# Turn the Meta dict into a basic class
meta = self.make_meta(app, name, data['Meta'], data.get("_stub", False))
failed_fields = {}
fields = {}
stub = False
# Now, make some fields!
for fname, params in data.items():
# If it's the stub marker, ignore it.
if fname == "_stub":
stub = bool(params)
continue
elif fname == "Meta":
continue
elif not params:
raise ValueError("Field '%s' on model '%s.%s' has no definition." % (fname, app, name))
elif isinstance(params, (str, unicode)):
# It's a premade definition string! Let's hope it works...
code = params
extra_imports = {}
else:
# If there's only one parameter (backwards compat), make it 3.
if len(params) == 1:
params = (params[0], [], {})
# There should be 3 parameters. Code is a tuple of (code, what-to-import)
if len(params) == 3:
code = "SouthFieldClass(%s)" % ", ".join(
params[1] +
["%s=%s" % (n, v) for n, v in params[2].items()]
)
extra_imports = {"SouthFieldClass": params[0]}
else:
raise ValueError("Field '%s' on model '%s.%s' has a weird definition length (should be 1 or 3 items)." % (fname, app, name))
try:
# Execute it in a probably-correct context.
field = self.eval_in_context(code, app, extra_imports)
except (NameError, AttributeError, AssertionError, KeyError):
# It might rely on other models being around. Add it to the
# model for the second pass.
failed_fields[fname] = (code, extra_imports)
else:
fields[fname] = field
# Find the app in the Django core, and get its module
more_kwds = {}
try:
app_module = models.get_app(app)
more_kwds['__module__'] = app_module.__name__
except ImproperlyConfigured:
# The app this belonged to has vanished, but thankfully we can still
# make a mock model, so ignore the error.
more_kwds['__module__'] = '_south_mock'
more_kwds['Meta'] = meta
# Make our model
fields.update(more_kwds)
model = type(
str(name),
tuple(map(ask_for_it_by_name, bases)),
fields,
)
# If this is a stub model, change Objects to a whiny class
if stub:
model.objects = WhinyManager()
# Also, make sure they can't instantiate it
model.__init__ = whiny_method
else:
model.objects = NoDryRunManager(model.objects)
if failed_fields:
model._failed_fields = failed_fields
return model
def retry_failed_fields(self):
"Tries to re-evaluate the _failed_fields for each model."
for modelkey, model in self.models.items():
app, modelname = modelkey.split(".", 1)
if hasattr(model, "_failed_fields"):
for fname, (code, extra_imports) in model._failed_fields.items():
try:
field = self.eval_in_context(code, app, extra_imports)
except (NameError, AttributeError, AssertionError, KeyError), e:
# It's failed again. Complain.
raise ValueError("Cannot successfully create field '%s' for model '%s': %s." % (
fname, modelname, e
))
else:
# Startup that field.
model.add_to_class(fname, field)
class WhinyManager(object):
"A fake manager that whines whenever you try to touch it. For stub models."
def __getattr__(self, key):
raise AttributeError("You cannot use items from a stub model.")
class NoDryRunManager(object):
"""
A manager that always proxies through to the real manager,
unless a dry run is in progress.
"""
def __init__(self, real):
self.real = real
def __getattr__(self, name):
if db.dry_run:
raise AttributeError("You are in a dry run, and cannot access the ORM.\nWrap ORM sections in 'if not db.dry_run:', or if the whole migration is only a data migration, set no_dry_run = True on the Migration class.")
return getattr(self.real, name)
def whiny_method(*a, **kw):
raise ValueError("You cannot instantiate a stub model.")

14
south/signals.py Normal file
View file

@ -0,0 +1,14 @@
"""
South-specific signals
"""
from django.dispatch import Signal
# Sent at the start of the migration of an app
pre_migrate = Signal(providing_args=["app"])
# Sent after each successful migration of an app
post_migrate = Signal(providing_args=["app"])
# Sent after each run of a particular migration in a direction
ran_migration = Signal(providing_args=["app","migration","method"])

1
south/tests/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

67
south/tests/__init__.py Normal file
View file

@ -0,0 +1,67 @@
import unittest
import os
import sys
from django.conf import settings
from south.hacks import hacks
# Add the tests directory so fakeapp is on sys.path
test_root = os.path.dirname(__file__)
sys.path.append(test_root)
# Note: the individual test files are imported below this.
class Monkeypatcher(unittest.TestCase):
"""
Base test class for tests that play with the INSTALLED_APPS setting at runtime.
"""
def create_fake_app(self, name):
class Fake:
pass
fake = Fake()
fake.__name__ = name
return fake
def create_test_app(self):
class Fake:
pass
fake = Fake()
fake.__name__ = "fakeapp.migrations"
fake.__file__ = os.path.join(test_root, "fakeapp", "migrations", "__init__.py")
return fake
def setUp(self):
"""
Changes the Django environment so we can run tests against our test apps.
"""
# Set the installed apps
hacks.set_installed_apps(["fakeapp", "otherfakeapp"])
def tearDown(self):
"""
Undoes what setUp did.
"""
hacks.reset_installed_apps()
# Try importing all tests if asked for (then we can run 'em)
try:
skiptest = settings.SKIP_SOUTH_TESTS
except:
skiptest = False
if not skiptest:
from south.tests.db import *
from south.tests.logic import *
from south.tests.autodetection import *
from south.tests.logger import *
from south.tests.inspector import *

View file

@ -0,0 +1,233 @@
import unittest
from south.management.commands import startmigration
class TestComparison(unittest.TestCase):
"""
Tests the comparison methods of startmigration.
"""
def test_no_change(self):
"Test with a completely unchanged definition."
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.related.ForeignKey', [], {'to': "orm['southdemo.Lizard']"}),
('django.db.models.fields.related.ForeignKey', [], {'to': "orm['southdemo.Lizard']"}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.related.ForeignKey', ['ohhai', 'there'], {'to': "somewhere", "from": "there"}),
('django.db.models.fields.related.ForeignKey', ['ohhai', 'there'], {"from": "there", 'to': "somewhere"}),
),
False,
)
def test_pos_change(self):
"Test with a changed positional argument."
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['hi'], {'to': "foo"}),
('django.db.models.fields.CharField', [], {'to': "foo"}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', [], {'to': "foo"}),
('django.db.models.fields.CharField', ['bye'], {'to': "foo"}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['pi'], {'to': "foo"}),
('django.db.models.fields.CharField', ['pi'], {'to': "foo"}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['pisdadad'], {'to': "foo"}),
('django.db.models.fields.CharField', ['pi'], {'to': "foo"}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['hi'], {}),
('django.db.models.fields.CharField', [], {}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', [], {}),
('django.db.models.fields.CharField', ['bye'], {}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['pi'], {}),
('django.db.models.fields.CharField', ['pi'], {}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['pi'], {}),
('django.db.models.fields.CharField', ['45fdfdf'], {}),
),
True,
)
def test_kwd_change(self):
"Test a changed keyword argument"
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['pi'], {'to': "foo"}),
('django.db.models.fields.CharField', ['pi'], {'to': "blue"}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', [], {'to': "foo"}),
('django.db.models.fields.CharField', [], {'to': "blue"}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['b'], {'to': "foo"}),
('django.db.models.fields.CharField', ['b'], {'to': "blue"}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', [], {'to': "foo"}),
('django.db.models.fields.CharField', [], {}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['a'], {'to': "foo"}),
('django.db.models.fields.CharField', ['a'], {}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', [], {}),
('django.db.models.fields.CharField', [], {'to': "foo"}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('django.db.models.fields.CharField', ['a'], {}),
('django.db.models.fields.CharField', ['a'], {'to': "foo"}),
),
True,
)
def test_backcompat_nochange(self):
"Test that the backwards-compatable comparison is working"
self.assertEqual(
startmigration.different_attributes(
('models.CharField', [], {}),
('django.db.models.fields.CharField', [], {}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', ['ack'], {}),
('django.db.models.fields.CharField', ['ack'], {}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', [], {'to':'b'}),
('django.db.models.fields.CharField', [], {'to':'b'}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', ['hah'], {'to':'you'}),
('django.db.models.fields.CharField', ['hah'], {'to':'you'}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', ['hah'], {'to':'you'}),
('django.db.models.fields.CharField', ['hah'], {'to':'heh'}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', ['hah'], {}),
('django.db.models.fields.CharField', [], {'to':"orm['appname.hah']"}),
),
False,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', ['hah'], {}),
('django.db.models.fields.CharField', [], {'to':'hah'}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', ['hah'], {}),
('django.db.models.fields.CharField', [], {'to':'rrr'}),
),
True,
)
self.assertEqual(
startmigration.different_attributes(
('models.CharField', ['hah'], {}),
('django.db.models.fields.IntField', [], {'to':'hah'}),
),
True,
)

357
south/tests/db.py Normal file
View file

@ -0,0 +1,357 @@
import unittest
from south.db import db
from django.db import connection, models
# Create a list of error classes from the various database libraries
errors = []
try:
from psycopg2 import ProgrammingError
errors.append(ProgrammingError)
except ImportError:
pass
errors = tuple(errors)
class TestOperations(unittest.TestCase):
"""
Tests if the various DB abstraction calls work.
Can only test a limited amount due to DB differences.
"""
def setUp(self):
db.debug = False
db.clear_deferred_sql()
def test_create(self):
"""
Test creation and deletion of tables.
"""
cursor = connection.cursor()
# It needs to take at least 2 args
self.assertRaises(TypeError, db.create_table)
self.assertRaises(TypeError, db.create_table, "test1")
# Empty tables (i.e. no columns) are not fine, so make at least 1
db.create_table("test1", [('email_confirmed', models.BooleanField(default=False))])
db.start_transaction()
# And should exist
cursor.execute("SELECT * FROM test1")
# Make sure we can't do the same query on an empty table
try:
cursor.execute("SELECT * FROM nottheretest1")
self.fail("Non-existent table could be selected!")
except:
pass
# Clear the dirty transaction
db.rollback_transaction()
db.start_transaction()
# Remove the table
db.drop_table("test1")
# Make sure it went
try:
cursor.execute("SELECT * FROM test1")
self.fail("Just-deleted table could be selected!")
except:
pass
# Clear the dirty transaction
db.rollback_transaction()
db.start_transaction()
# Try deleting a nonexistent one
try:
db.delete_table("nottheretest1")
self.fail("Non-existent table could be deleted!")
except:
pass
db.rollback_transaction()
def test_foreign_keys(self):
"""
Tests foreign key creation, especially uppercase (see #61)
"""
Test = db.mock_model(model_name='Test', db_table='test5a',
db_tablespace='', pk_field_name='ID',
pk_field_type=models.AutoField, pk_field_args=[])
db.start_transaction()
db.create_table("test5a", [('ID', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True))])
db.create_table("test5b", [
('id', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True)),
('UNIQUE', models.ForeignKey(Test)),
])
db.execute_deferred_sql()
db.rollback_transaction()
def test_rename(self):
"""
Test column renaming
"""
cursor = connection.cursor()
db.create_table("test_rn", [('spam', models.BooleanField(default=False))])
db.start_transaction()
# Make sure we can select the column
cursor.execute("SELECT spam FROM test_rn")
# Rename it
db.rename_column("test_rn", "spam", "eggs")
cursor.execute("SELECT eggs FROM test_rn")
try:
cursor.execute("SELECT spam FROM test_rn")
self.fail("Just-renamed column could be selected!")
except:
pass
db.rollback_transaction()
db.delete_table("test_rn")
def test_dry_rename(self):
"""
Test column renaming while --dry-run is turned on (should do nothing)
See ticket #65
"""
cursor = connection.cursor()
db.create_table("test_drn", [('spam', models.BooleanField(default=False))])
db.start_transaction()
# Make sure we can select the column
cursor.execute("SELECT spam FROM test_drn")
# Rename it
db.dry_run = True
db.rename_column("test_drn", "spam", "eggs")
db.dry_run = False
cursor.execute("SELECT spam FROM test_drn")
try:
cursor.execute("SELECT eggs FROM test_drn")
self.fail("Dry-renamed new column could be selected!")
except:
pass
db.rollback_transaction()
db.delete_table("test_drn")
def test_table_rename(self):
"""
Test column renaming
"""
cursor = connection.cursor()
db.create_table("testtr", [('spam', models.BooleanField(default=False))])
db.start_transaction()
# Make sure we can select the column
cursor.execute("SELECT spam FROM testtr")
# Rename it
db.rename_table("testtr", "testtr2")
cursor.execute("SELECT spam FROM testtr2")
try:
cursor.execute("SELECT spam FROM testtr")
self.fail("Just-renamed column could be selected!")
except:
pass
db.rollback_transaction()
db.delete_table("testtr2")
def test_index(self):
"""
Test the index operations
"""
db.create_table("test3", [
('SELECT', models.BooleanField(default=False)),
('eggs', models.IntegerField(unique=True)),
])
db.execute_deferred_sql()
db.start_transaction()
# Add an index on that column
db.create_index("test3", ["SELECT"])
# Add another index on two columns
db.create_index("test3", ["SELECT", "eggs"])
# Delete them both
db.delete_index("test3", ["SELECT"])
db.delete_index("test3", ["SELECT", "eggs"])
# Delete the unique index/constraint
db.delete_unique("test3", ["eggs"])
db.rollback_transaction()
db.delete_table("test3")
def test_primary_key(self):
"""
Test the primary key operations
"""
db.create_table("test_pk", [
('id', models.IntegerField(primary_key=True)),
('new_pkey', models.IntegerField()),
('eggs', models.IntegerField(unique=True)),
])
db.execute_deferred_sql()
db.start_transaction()
# Remove the default primary key, and make eggs it
db.drop_primary_key("test_pk")
db.create_primary_key("test_pk", "new_pkey")
# Try inserting a now-valid row pair
db.execute("INSERT INTO test_pk (id, new_pkey, eggs) VALUES (1, 2, 3), (1, 3, 4)")
db.rollback_transaction()
db.delete_table("test_pk")
def test_alter(self):
"""
Test altering columns/tables
"""
db.create_table("test4", [
('spam', models.BooleanField(default=False)),
('eggs', models.IntegerField()),
])
db.start_transaction()
# Add a column
db.add_column("test4", "add1", models.IntegerField(default=3), keep_default=False)
# Add a FK with keep_default=False (#69)
User = db.mock_model(model_name='User', db_table='auth_user', db_tablespace='', pk_field_name='id', pk_field_type=models.AutoField, pk_field_args=[], pk_field_kwargs={})
db.add_column("test4", "user", models.ForeignKey(User, null=True), keep_default=False)
db.delete_column("test4", "add1")
db.rollback_transaction()
db.delete_table("test4")
def test_alter_column_postgres_multiword(self):
"""
Tests altering columns with multiple words in Postgres types (issue #125)
e.g. 'datetime with time zone', look at django/db/backends/postgresql/creation.py
"""
db.create_table("test_multiword", [
('col_datetime', models.DateTimeField(null=True)),
('col_integer', models.PositiveIntegerField(null=True)),
('col_smallint', models.PositiveSmallIntegerField(null=True)),
('col_float', models.FloatField(null=True)),
])
# test if 'double precision' is preserved
db.alter_column('test_multiword', 'col_float', models.FloatField('float', null=True))
# test if 'CHECK ("%(column)s" >= 0)' is stripped
db.alter_column('test_multiword', 'col_integer', models.PositiveIntegerField(null=True))
db.alter_column('test_multiword', 'col_smallint', models.PositiveSmallIntegerField(null=True))
# test if 'with timezone' is preserved
if db.backend_name == "postgres":
db.start_transaction()
db.execute("INSERT INTO test_multiword (col_datetime) VALUES ('2009-04-24 14:20:55+02')")
db.alter_column('test_multiword', 'col_datetime', models.DateTimeField(auto_now=True))
assert db.execute("SELECT col_datetime = '2009-04-24 14:20:55+02' FROM test_multiword")[0][0]
db.rollback_transaction()
db.delete_table("test_multiword")
def test_alter_constraints(self):
"""
Tests that going from a PostiveIntegerField to an IntegerField drops
the constraint on the database.
"""
db.create_table("test_alterc", [
('num', models.PositiveIntegerField()),
])
# Add in some test values
db.execute("INSERT INTO test_alterc (num) VALUES (1), (2)")
# Ensure that adding a negative number is bad
db.start_transaction()
try:
db.execute("INSERT INTO test_alterc (num) VALUES (-3)")
except:
db.rollback_transaction()
else:
self.fail("Could insert a negative integer into a PositiveIntegerField.")
# Alter it to a normal IntegerField
db.alter_column("test_alterc", "num", models.IntegerField())
# It should now work
db.execute("INSERT INTO test_alterc (num) VALUES (-3)")
db.delete_table("test_alterc")
def test_unique(self):
"""
Tests creating/deleting unique constraints.
"""
db.create_table("test_unique2", [
('id', models.AutoField(primary_key=True)),
])
db.create_table("test_unique", [
('spam', models.BooleanField(default=False)),
('eggs', models.IntegerField()),
('ham', models.ForeignKey(db.mock_model('Unique2', 'test_unique2'))),
])
# Add a constraint
db.create_unique("test_unique", ["spam"])
# Shouldn't do anything during dry-run
db.dry_run = True
db.delete_unique("test_unique", ["spam"])
db.dry_run = False
db.delete_unique("test_unique", ["spam"])
db.create_unique("test_unique", ["spam"])
db.start_transaction()
# Test it works
db.execute("INSERT INTO test_unique2 (id) VALUES (1), (2)")
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1), (false, 1, 2)")
try:
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 2, 1)")
except:
db.rollback_transaction()
else:
self.fail("Could insert non-unique item.")
# Drop that, add one only on eggs
db.delete_unique("test_unique", ["spam"])
db.execute("DELETE FROM test_unique")
db.create_unique("test_unique", ["eggs"])
db.start_transaction()
# Test similarly
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1), (false, 1, 2)")
try:
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 1, 1)")
except:
db.rollback_transaction()
else:
self.fail("Could insert non-unique item.")
# Drop those, test combined constraints
db.delete_unique("test_unique", ["eggs"])
db.execute("DELETE FROM test_unique")
db.create_unique("test_unique", ["spam", "eggs", "ham_id"])
db.start_transaction()
# Test similarly
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1), (false, 1, 1)")
try:
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1)")
except:
db.rollback_transaction()
else:
self.fail("Could insert non-unique pair.")
db.delete_unique("test_unique", ["spam", "eggs", "ham_id"])
def test_capitalised_constraints(self):
"""
Under PostgreSQL at least, capitalised constrains must be quoted.
"""
db.start_transaction()
try:
db.create_table("test_capconst", [
('SOMECOL', models.PositiveIntegerField(primary_key=True)),
])
# Alter it so it's not got the check constraint
db.alter_column("test_capconst", "SOMECOL", models.IntegerField())
finally:
db.rollback_transaction()
def test_text_default(self):
"""
MySQL cannot have blank defaults on TEXT columns.
"""
db.start_transaction()
try:
db.create_table("test_textdef", [
('textcol', models.TextField(blank=True)),
])
finally:
db.rollback_transaction()
def test_add_unique_fk(self):
"""
Test adding a ForeignKey with unique=True or a OneToOneField
"""
db.create_table("test_add_unique_fk", [
('spam', models.BooleanField(default=False))
])
db.start_transaction()
db.add_column("test_add_unique_fk", "mock1", models.ForeignKey(db.mock_model('Mock', 'mock'), null=True, unique=True))
db.add_column("test_add_unique_fk", "mock2", models.OneToOneField(db.mock_model('Mock', 'mock'), null=True))
db.rollback_transaction()
db.delete_table("test_add_unique_fk")

1
south/tests/fakeapp/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

View file

View file

@ -0,0 +1 @@
/*.pyc

View file

@ -0,0 +1,19 @@
from south.db import db
from django.db import models
class Migration:
def forwards(self):
# Model 'Spam'
db.create_table("southtest_spam", (
('id', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True)),
('weight', models.FloatField()),
('expires', models.DateTimeField()),
('name', models.CharField(max_length=255))
))
def backwards(self):
db.delete_table("southtest_spam")

View file

@ -0,0 +1,20 @@
from south.db import db
from django.db import models
class Migration:
def forwards(self):
Spam = db.mock_model(model_name='Spam', db_table='southtest_spam', db_tablespace='', pk_field_name='id', pk_field_type=models.AutoField)
db.create_table("southtest_eggs", (
('id', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True)),
('size', models.FloatField()),
('quantity', models.IntegerField()),
('spam', models.ForeignKey(Spam)),
))
def backwards(self):
db.delete_table("southtest_eggs")

View file

@ -0,0 +1,22 @@
from south.db import db
from django.db import models
class Migration:
needed_by = (
("otherfakeapp", "0003_third"),
)
def forwards(self):
db.alter_column("southtest_spam", 'name', models.CharField(max_length=255, null=True))
def backwards(self):
db.alter_column("southtest_spam", 'name', models.CharField(max_length=255))
models = {
"fakeapp.bug135": {
'date': ('models.DateTimeField', [], {'default': 'datetime.datetime(2009, 5, 6, 15, 33, 15, 780013)'}),
}
}

View file

@ -0,0 +1,55 @@
# -*- coding: UTF-8 -*-
from django.db import models
from django.contrib.auth.models import User as UserAlias
def default_func():
return "yays"
# An empty case.
class Other1(models.Model): pass
# Nastiness.
class HorribleModel(models.Model):
"A model to test the edge cases of model parsing"
ZERO, ONE = range(2)
# First, some nice fields
name = models.CharField(max_length=255)
short_name = models.CharField(max_length=50)
slug = models.SlugField(unique=True)
# A ForeignKey, to a model above, and then below
o1 = models.ForeignKey(Other1)
o2 = models.ForeignKey('Other2')
# Now to something outside
user = models.ForeignKey(UserAlias, related_name="horribles")
# Unicode!
code = models.CharField(max_length=25, default="↑↑↓↓←→←→BA")
# Odd defaults!
class_attr = models.IntegerField(default=ZERO)
func = models.CharField(max_length=25, default=default_func)
# Time to get nasty. Define a non-field choices, and use it
choices = [('hello', '1'), ('world', '2')]
choiced = models.CharField(max_length=20, choices=choices)
class Meta:
db_table = "my_fave"
verbose_name = "Dr. Strangelove," + \
"""or how I learned to stop worrying
and love the bomb"""
# Now spread over multiple lines
multiline = \
models.TextField(
)
# Special case.
class Other2(models.Model):
# Try loading a field without a newline after it (inspect hates this)
close_but_no_cigar = models.PositiveIntegerField(primary_key=True)

49
south/tests/inspector.py Normal file
View file

@ -0,0 +1,49 @@
import unittest
from south.tests import Monkeypatcher
from south.modelsinspector import *
from fakeapp.models import HorribleModel
class TestModelInspector(Monkeypatcher):
"""
Tests if the various parts of the modelinspector work.
"""
def test_get_value(self):
# Let's start nicely.
name = HorribleModel._meta.get_field_by_name("name")[0]
slug = HorribleModel._meta.get_field_by_name("slug")[0]
user = HorribleModel._meta.get_field_by_name("user")[0]
# Simple int retrieval
self.assertEqual(
get_value(name, ["max_length", {}]),
"255",
)
# Bool retrieval
self.assertEqual(
get_value(slug, ["unique", {}]),
"True",
)
# String retrieval
self.assertEqual(
get_value(user, ["rel.related_name", {}]),
"'horribles'",
)
# Default triggering
self.assertEqual(
get_value(slug, ["unique", {"default": False}]),
"True",
)
self.assertRaises(
IsDefault,
get_value,
slug,
["unique", {"default": True}],
)

54
south/tests/logger.py Normal file
View file

@ -0,0 +1,54 @@
import os
import unittest
from django.conf import settings
from django.db import connection, models
from south.db import db
#
# # Create a list of error classes from the various database libraries
# errors = []
# try:
# from psycopg2 import ProgrammingError
# errors.append(ProgrammingError)
# except ImportError:
# pass
# errors = tuple(errors)
class TestLogger(unittest.TestCase):
"""
Tests if the various logging functions.
"""
def setUp(self):
db.debug = False
self.test_path = os.path.join(os.path.dirname(__file__),"test.log")
def test_db_execute_logging_nofile(self):
""" Does logging degrade nicely if SOUTH_DEBUG_ON not set?
"""
settings.SOUTH_LOGGING_ON = False # this needs to be set to False
# to avoid issues where other tests
# set this to True. settings is shared
# between these tests.
db.create_table("test9", [('email_confirmed', models.BooleanField(default=False))])
def test_db_execute_logging_validfile(self):
""" Does logging work when passing in a valid file?
"""
settings.SOUTH_LOGGING_ON = True
settings.SOUTH_LOGGING_FILE = self.test_path
db.create_table("test10", [('email_confirmed', models.BooleanField(default=False))])
# remove the test log file
os.remove(self.test_path)
def test_db_execute_logging_missingfilename(self):
""" Does logging raise an error if there is a missing filename?
"""
settings.SOUTH_LOGGING_ON = True
settings.SOUTH_LOGGING_FILE = None
self.assertRaises(IOError,
db.create_table, "test11", [('email_confirmed', models.BooleanField(default=False))])

271
south/tests/logic.py Normal file
View file

@ -0,0 +1,271 @@
import unittest
import datetime
import sys
import os
import StringIO
from south import migration
from south.tests import Monkeypatcher
from south.utils import snd
class TestMigrationLogic(Monkeypatcher):
"""
Tests if the various logic functions in migration actually work.
"""
def test_get_app_name(self):
self.assertEqual(
"southtest",
migration.get_app_name(self.create_fake_app("southtest.migrations")),
)
self.assertEqual(
"baz",
migration.get_app_name(self.create_fake_app("foo.bar.baz.migrations")),
)
def test_get_migrated_apps(self):
P1 = __import__("fakeapp.migrations", {}, {}, [''])
P2 = __import__("otherfakeapp.migrations", {}, {}, [''])
self.assertEqual(
[P1,P2],
list(migration.get_migrated_apps()),
)
def test_get_app(self):
P1 = __import__("fakeapp.migrations", {}, {}, [''])
self.assertEqual(P1, migration.get_app("fakeapp"))
self.assertEqual(P1, migration.get_app(self.create_fake_app("fakeapp.models")))
def test_get_app_fullname(self):
self.assertEqual(
"southtest",
migration.get_app_fullname(self.create_fake_app("southtest.migrations")),
)
self.assertEqual(
"foo.bar.baz",
migration.get_app_fullname(self.create_fake_app("foo.bar.baz.migrations")),
)
def test_get_migration_names(self):
app = self.create_test_app()
self.assertEqual(
["0001_spam", "0002_eggs", "0003_alter_spam"],
migration.get_migration_names(app),
)
def test_get_migration_classes(self):
app = self.create_test_app()
# Can't use vanilla import, modules beginning with numbers aren't in grammar
M1 = __import__("fakeapp.migrations.0001_spam", {}, {}, ['Migration']).Migration
M2 = __import__("fakeapp.migrations.0002_eggs", {}, {}, ['Migration']).Migration
M3 = __import__("fakeapp.migrations.0003_alter_spam", {}, {}, ['Migration']).Migration
self.assertEqual(
[M1, M2, M3],
list(migration.get_migration_classes(app)),
)
def test_get_migration(self):
app = self.create_test_app()
# Can't use vanilla import, modules beginning with numbers aren't in grammar
M1 = __import__("fakeapp.migrations.0001_spam", {}, {}, ['Migration']).Migration
M2 = __import__("fakeapp.migrations.0002_eggs", {}, {}, ['Migration']).Migration
self.assertEqual(M1, migration.get_migration(app, "0001_spam"))
self.assertEqual(M2, migration.get_migration(app, "0002_eggs"))
# Temporarily redirect sys.stdout during this, it whinges.
stdout, sys.stdout = sys.stdout, StringIO.StringIO()
try:
self.assertRaises((ImportError, ValueError), migration.get_migration, app, "0001_jam")
finally:
sys.stdout = stdout
def test_all_migrations(self):
app = migration.get_app("fakeapp")
otherapp = migration.get_app("otherfakeapp")
self.assertEqual({
app: {
"0001_spam": migration.get_migration(app, "0001_spam"),
"0002_eggs": migration.get_migration(app, "0002_eggs"),
"0003_alter_spam": migration.get_migration(app, "0003_alter_spam"),
},
otherapp: {
"0001_first": migration.get_migration(otherapp, "0001_first"),
"0002_second": migration.get_migration(otherapp, "0002_second"),
"0003_third": migration.get_migration(otherapp, "0003_third"),
},
},
migration.all_migrations(),
)
def assertListEqual(self, list1, list2):
list1 = list(list1)
list2 = list(list2)
list1.sort()
list2.sort()
return self.assertEqual(list1, list2)
def test_apply_migrations(self):
migration.MigrationHistory.objects.all().delete()
app = migration.get_app("fakeapp")
# We should start with no migrations
self.assertEqual(list(migration.MigrationHistory.objects.all()), [])
# Apply them normally
tree = migration.dependency_tree()
migration.migrate_app(app, tree, target_name=None, resolve_mode=None, fake=False, verbosity=0)
# We should finish with all migrations
self.assertListEqual(
(
(u"fakeapp", u"0001_spam"),
(u"fakeapp", u"0002_eggs"),
(u"fakeapp", u"0003_alter_spam"),
),
migration.MigrationHistory.objects.values_list("app_name", "migration"),
)
# Now roll them backwards
migration.migrate_app(app, tree, target_name="zero", resolve_mode=None, fake=False, verbosity=0)
# Finish with none
self.assertEqual(list(migration.MigrationHistory.objects.all()), [])
def test_migration_merge_forwards(self):
migration.MigrationHistory.objects.all().delete()
app = migration.get_app("fakeapp")
# We should start with no migrations
self.assertEqual(list(migration.MigrationHistory.objects.all()), [])
# Insert one in the wrong order
migration.MigrationHistory.objects.create(
app_name = "fakeapp",
migration = "0002_eggs",
applied = datetime.datetime.now(),
)
# Did it go in?
self.assertListEqual(
(
(u"fakeapp", u"0002_eggs"),
),
migration.MigrationHistory.objects.values_list("app_name", "migration"),
)
# Apply them normally
tree = migration.dependency_tree()
try:
# Redirect the error it will print to nowhere
stdout, sys.stdout = sys.stdout, StringIO.StringIO()
migration.migrate_app(app, tree, target_name=None, resolve_mode=None, fake=False, verbosity=0)
sys.stdout = stdout
except SystemExit:
pass
# Nothing should have changed (no merge mode!)
self.assertListEqual(
(
(u"fakeapp", u"0002_eggs"),
),
migration.MigrationHistory.objects.values_list("app_name", "migration"),
)
# Apply with merge
migration.migrate_app(app, tree, target_name=None, resolve_mode="merge", fake=False, verbosity=0)
# We should finish with all migrations
self.assertListEqual(
(
(u"fakeapp", u"0001_spam"),
(u"fakeapp", u"0002_eggs"),
(u"fakeapp", u"0003_alter_spam"),
),
migration.MigrationHistory.objects.values_list("app_name", "migration"),
)
# Now roll them backwards
migration.migrate_app(app, tree, target_name="0002", resolve_mode=None, fake=False, verbosity=0)
migration.migrate_app(app, tree, target_name="0001", resolve_mode=None, fake=True, verbosity=0)
migration.migrate_app(app, tree, target_name="zero", resolve_mode=None, fake=False, verbosity=0)
# Finish with none
self.assertEqual(list(migration.MigrationHistory.objects.all()), [])
def test_alter_column_null(self):
def null_ok():
from django.db import connection, transaction
# the DBAPI introspection module fails on postgres NULLs.
cursor = connection.cursor()
try:
cursor.execute("INSERT INTO southtest_spam (id, weight, expires, name) VALUES (100, 10.1, now(), NULL);")
except:
transaction.rollback()
return False
else:
cursor.execute("DELETE FROM southtest_spam")
transaction.commit()
return True
app = migration.get_app("fakeapp")
tree = migration.dependency_tree()
self.assertEqual(list(migration.MigrationHistory.objects.all()), [])
# by default name is NOT NULL
migration.migrate_app(app, tree, target_name="0002", resolve_mode=None, fake=False, verbosity=0)
self.failIf(null_ok())
# after 0003, it should be NULL
migration.migrate_app(app, tree, target_name="0003", resolve_mode=None, fake=False, verbosity=0)
self.assert_(null_ok())
# make sure it is NOT NULL again
migration.migrate_app(app, tree, target_name="0002", resolve_mode=None, fake=False, verbosity=0)
self.failIf(null_ok(), 'name not null after migration')
# finish with no migrations, otherwise other tests fail...
migration.migrate_app(app, tree, target_name="zero", resolve_mode=None, fake=False, verbosity=0)
self.assertEqual(list(migration.MigrationHistory.objects.all()), [])
def test_dependencies(self):
fakeapp = migration.get_app("fakeapp")
otherfakeapp = migration.get_app("otherfakeapp")
# Test a simple path
tree = migration.dependency_tree()
self.assertEqual(
map(snd, migration.needed_before_forwards(tree, fakeapp, "0003_alter_spam")),
['0001_spam', '0002_eggs'],
)
# And a complex one, with both back and forwards deps
self.assertEqual(
map(snd, migration.needed_before_forwards(tree, otherfakeapp, "0003_third")),
['0001_spam', '0001_first', '0002_second', '0002_eggs', '0003_alter_spam'],
)

1
south/tests/otherfakeapp/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/*.pyc

View file

View file

@ -0,0 +1 @@
/*.pyc

View file

@ -0,0 +1,15 @@
from south.db import db
from django.db import models
class Migration:
depends_on = (
("fakeapp", "0001_spam"),
)
def forwards(self):
pass
def backwards(self):
pass

View file

@ -0,0 +1,11 @@
from south.db import db
from django.db import models
class Migration:
def forwards(self):
pass
def backwards(self):
pass

View file

@ -0,0 +1,10 @@
from south.db import db
from django.db import models
class Migration:
def forwards(self):
pass
def backwards(self):
pass

View file

@ -0,0 +1 @@
# This file left intentionally blank.

39
south/utils.py Normal file
View file

@ -0,0 +1,39 @@
"""
Generally helpful utility functions.
"""
def _ask_for_it_by_name(name):
"Returns an object referenced by absolute path."
bits = name.split(".")
## what if there is no absolute reference?
if len(bits)>1:
modulename = ".".join(bits[:-1])
else:
modulename=bits[0]
module = __import__(modulename, {}, {}, bits[-1])
return getattr(module, bits[-1])
def ask_for_it_by_name(name):
"Returns an object referenced by absolute path. (Memoised outer wrapper)"
if name not in ask_for_it_by_name.cache:
ask_for_it_by_name.cache[name] = _ask_for_it_by_name(name)
return ask_for_it_by_name.cache[name]
ask_for_it_by_name.cache = {}
def get_attribute(item, attribute):
"""
Like getattr, but recursive (i.e. you can ask for 'foo.bar.yay'.)
"""
value = item
for part in attribute.split("."):
value = getattr(value, part)
return value
fst = lambda (x, y): x
snd = lambda (x, y): y