Added head revision of django-permissions and django-workflows. See #535
- Legacy-Id: 2598
This commit is contained in:
parent
8f02f16a67
commit
3a5a5f0e47
ietf
permissions
__init__.pyadmin.pybackend.pyexceptions.py
fixtures
locale/de/LC_MESSAGES
models.pytemplatetags
tests.pyutils.pyworkflows
|
@ -118,6 +118,8 @@ INSTALLED_APPS = (
|
|||
'django.contrib.admin',
|
||||
'django.contrib.humanize',
|
||||
'south',
|
||||
'workflows',
|
||||
'permissions',
|
||||
'ietf.announcements',
|
||||
'ietf.idindex',
|
||||
'ietf.idtracker',
|
||||
|
|
148
permissions/__init__.py
Normal file
148
permissions/__init__.py
Normal file
|
@ -0,0 +1,148 @@
|
|||
import permissions.utils
|
||||
|
||||
class PermissionBase(object):
|
||||
"""Mix-in class for permissions.
|
||||
"""
|
||||
def grant_permission(self, role, permission):
|
||||
"""Grants passed permission to passed role. Returns True if the
|
||||
permission was able to be added, otherwise False.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
role
|
||||
The role for which the permission should be granted.
|
||||
|
||||
permission
|
||||
The permission which should be granted. Either a permission
|
||||
object or the codename of a permission.
|
||||
"""
|
||||
return permissions.utils.grant_permission(self, role, permission)
|
||||
|
||||
def remove_permission(self, role, permission):
|
||||
"""Removes passed permission from passed role. Returns True if the
|
||||
permission has been removed.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
role
|
||||
The role for which a permission should be removed.
|
||||
|
||||
permission
|
||||
The permission which should be removed. Either a permission object
|
||||
or the codename of a permission.
|
||||
"""
|
||||
return permissions.utils.remove_permission(self, role, permission)
|
||||
|
||||
def has_permission(self, user, permission, roles=[]):
|
||||
"""Returns True if the passed user has passed permission for this
|
||||
instance. Otherwise False.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
permission
|
||||
The permission's codename which should be checked. Must be a
|
||||
string with a valid codename.
|
||||
|
||||
user
|
||||
The user for which the permission should be checked.
|
||||
|
||||
roles
|
||||
If passed, these roles will be assigned to the user temporarily
|
||||
before the permissions are checked.
|
||||
"""
|
||||
return permissions.utils.has_permission(self, user, permission, roles)
|
||||
|
||||
def check_permission(self, user, permission, roles=[]):
|
||||
"""Raise Unauthorized if the the passed user hasn't passed permission
|
||||
for this instance.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
permission
|
||||
The permission's codename which should be checked. Must be a
|
||||
string with a valid codename.
|
||||
|
||||
user
|
||||
The user for which the permission should be checked.
|
||||
|
||||
roles
|
||||
If passed, these roles will be assigned to the user temporarily
|
||||
before the permissions are checked.
|
||||
"""
|
||||
if not self.has_permission(user, permission, roles):
|
||||
raise Unauthorized("User %s doesn't have permission %s for object %s" % (user, permission, obj.slug))
|
||||
|
||||
def add_inheritance_block(self, permission):
|
||||
"""Adds an inheritance block for the passed permission.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
permission
|
||||
The permission for which an inheritance block should be added.
|
||||
Either a permission object or the codename of a permission.
|
||||
"""
|
||||
return permissions.utils.add_inheritance_block(self, permission)
|
||||
|
||||
def remove_inheritance_block(self, permission):
|
||||
"""Removes a inheritance block for the passed permission.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
permission
|
||||
The permission for which an inheritance block should be removed.
|
||||
Either a permission object or the codename of a permission.
|
||||
"""
|
||||
return permissions.utils.remove_inheritance_block(self, permission)
|
||||
|
||||
def is_inherited(self, codename):
|
||||
"""Returns True if the passed permission is inherited.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
codename
|
||||
The permission which should be checked. Must be the codename of
|
||||
the permission.
|
||||
"""
|
||||
return permissions.utils.is_inherited(self, codename)
|
||||
|
||||
def add_role(self, principal, role):
|
||||
"""Adds a local role for the principal.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
principal
|
||||
The principal (user or group) which gets the role.
|
||||
|
||||
role
|
||||
The role which is assigned.
|
||||
"""
|
||||
return permissions.utils.add_local_role(self, principal, role)
|
||||
|
||||
def get_roles(self, principal):
|
||||
"""Returns *direct* local roles for passed principal (user or group).
|
||||
"""
|
||||
return permissions.utils.get_local_roles(self, principal)
|
||||
|
||||
def remove_role(self, principal, role):
|
||||
"""Adds a local role for the principal to the object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
principal
|
||||
The principal (user or group) from which the role is removed.
|
||||
|
||||
role
|
||||
The role which is removed.
|
||||
"""
|
||||
return permissions.utils.remove_local_role(self, principal, role)
|
||||
|
||||
def remove_roles(self, principal):
|
||||
"""Removes all local roles for the passed principal from the object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
principal
|
||||
The principal (user or group) from which all local roles are
|
||||
removed.
|
||||
"""
|
||||
return permissions.utils.remove_local_roles(self, principal)
|
13
permissions/admin.py
Normal file
13
permissions/admin.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
from django.contrib import admin
|
||||
|
||||
from permissions.models import ObjectPermission
|
||||
admin.site.register(ObjectPermission)
|
||||
|
||||
from permissions.models import Permission
|
||||
admin.site.register(Permission)
|
||||
|
||||
from permissions.models import Role
|
||||
admin.site.register(Role)
|
||||
|
||||
from permissions.models import PrincipalRoleRelation
|
||||
admin.site.register(PrincipalRoleRelation)
|
45
permissions/backend.py
Normal file
45
permissions/backend.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
# permissions imports
|
||||
import permissions.utils
|
||||
|
||||
class ObjectPermissionsBackend(object):
|
||||
"""Django backend for object permissions. Needs Django 1.2.
|
||||
|
||||
|
||||
Use it together with the default ModelBackend like so::
|
||||
|
||||
AUTHENTICATION_BACKENDS = (
|
||||
'django.contrib.auth.backends.ModelBackend',
|
||||
'permissions.backend.ObjectPermissionsBackend',
|
||||
)
|
||||
|
||||
Then you can use it like:
|
||||
|
||||
user.has_perm("view", your_object)
|
||||
|
||||
"""
|
||||
supports_object_permissions = True
|
||||
supports_anonymous_user = True
|
||||
|
||||
def authenticate(self, username, password):
|
||||
return None
|
||||
|
||||
def has_perm(self, user_obj, perm, obj=None):
|
||||
"""Checks whether the passed user has passed permission for passed
|
||||
object (obj).
|
||||
|
||||
This should be the primary method to check wether a user has a certain
|
||||
permission.
|
||||
|
||||
Parameters
|
||||
==========
|
||||
|
||||
perm
|
||||
The permission's codename which should be checked.
|
||||
|
||||
user_obj
|
||||
The user for which the permission should be checked.
|
||||
|
||||
obj
|
||||
The object for which the permission should be checked.
|
||||
"""
|
||||
return permissions.utils.has_permission(obj, user_obj, perm)
|
3
permissions/exceptions.py
Normal file
3
permissions/exceptions.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
class Unauthorized(Exception):
|
||||
def __init__(self, str):
|
||||
super(Unauthorized, self).__init__(str)
|
23
permissions/fixtures/initial.xml
Normal file
23
permissions/fixtures/initial.xml
Normal file
|
@ -0,0 +1,23 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<django-objects version="1.0">
|
||||
<object pk="1" model="permissions.permission">
|
||||
<field type="CharField" name="name">View</field>
|
||||
<field type="CharField" name="codename">view</field>
|
||||
</object>
|
||||
<object pk="2" model="permissions.permission">
|
||||
<field type="CharField" name="name">Edit</field>
|
||||
<field type="CharField" name="codename">edit</field>
|
||||
</object>
|
||||
<object pk="3" model="permissions.permission">
|
||||
<field type="CharField" name="name">Delete</field>
|
||||
<field type="CharField" name="codename">delete</field>
|
||||
</object>
|
||||
<object pk="4" model="permissions.permission">
|
||||
<field type="CharField" name="name">Cut</field>
|
||||
<field type="CharField" name="codename">cut</field>
|
||||
</object>
|
||||
<object pk="5" model="permissions.permission">
|
||||
<field type="CharField" name="name">Copy</field>
|
||||
<field type="CharField" name="codename">copy</field>
|
||||
</object>
|
||||
</django-objects>
|
BIN
permissions/locale/de/LC_MESSAGES/django.mo
Normal file
BIN
permissions/locale/de/LC_MESSAGES/django.mo
Normal file
Binary file not shown.
52
permissions/locale/de/LC_MESSAGES/django.po
Normal file
52
permissions/locale/de/LC_MESSAGES/django.po
Normal file
|
@ -0,0 +1,52 @@
|
|||
# German translations for django-permissions
|
||||
# Copyright (C) 2010 Kai Diefenbach
|
||||
# This file is distributed under the same license as the PACKAGE package.
|
||||
# Kai Diefenbach <kai.diefenbach@iqpp.de>, 2010.
|
||||
#
|
||||
msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: 1.0\n"
|
||||
"Report-Msgid-Bugs-To: \n"
|
||||
"POT-Creation-Date: 2010-03-30 23:12+0200\n"
|
||||
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
|
||||
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
|
||||
"Language-Team: LANGUAGE <LL@li.org>\n"
|
||||
"MIME-Version: 1.0\n"
|
||||
"Content-Type: text/plain; charset=UTF-8\n"
|
||||
"Content-Transfer-Encoding: 8bit\n"
|
||||
|
||||
#: models.py:154
|
||||
msgid "Name"
|
||||
msgstr "Name"
|
||||
|
||||
#: models.py:155
|
||||
msgid "Codename"
|
||||
msgstr "Codename"
|
||||
|
||||
#: models.py:156
|
||||
msgid "Content Types"
|
||||
msgstr "Inhaltstypen"
|
||||
|
||||
#: models.py:175 models.py:280
|
||||
msgid "Role"
|
||||
msgstr "Rolle"
|
||||
|
||||
#: models.py:176 models.py:216
|
||||
msgid "Permission"
|
||||
msgstr "Recht"
|
||||
|
||||
#: models.py:178 models.py:218 models.py:282
|
||||
msgid "Content type"
|
||||
msgstr "Inhaltstyp"
|
||||
|
||||
#: models.py:179 models.py:219 models.py:283
|
||||
msgid "Content id"
|
||||
msgstr "Inhalts-ID"
|
||||
|
||||
#: models.py:278
|
||||
msgid "User"
|
||||
msgstr "Benutzer"
|
||||
|
||||
#: models.py:279
|
||||
msgid "Group"
|
||||
msgstr "Gruppe"
|
193
permissions/models.py
Normal file
193
permissions/models.py
Normal file
|
@ -0,0 +1,193 @@
|
|||
# python imports
|
||||
import sets
|
||||
|
||||
# django imports
|
||||
from django.db import models
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.contenttypes import generic
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
class Permission(models.Model):
|
||||
"""A permission which can be granted to users/groups and objects.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
name
|
||||
The unique name of the permission. This is displayed to users.
|
||||
|
||||
codename
|
||||
The unique codename of the permission. This is used internal to
|
||||
identify a permission.
|
||||
|
||||
content_types
|
||||
The content types for which the permission is active. This can be
|
||||
used to display only reasonable permissions for an object.
|
||||
"""
|
||||
name = models.CharField(_(u"Name"), max_length=100, unique=True)
|
||||
codename = models.CharField(_(u"Codename"), max_length=100, unique=True)
|
||||
content_types = models.ManyToManyField(ContentType, verbose_name=_(u"Content Types"), blank=True, null=True, related_name="content_types")
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s (%s)" % (self.name, self.codename)
|
||||
|
||||
class ObjectPermission(models.Model):
|
||||
"""Grants permission for specific user/group and object.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
role
|
||||
The role for which the permission is granted.
|
||||
|
||||
permission
|
||||
The permission which is granted.
|
||||
|
||||
content
|
||||
The object for which the permission is granted.
|
||||
"""
|
||||
role = models.ForeignKey("Role", verbose_name=_(u"Role"), blank=True, null=True)
|
||||
permission = models.ForeignKey(Permission, verbose_name=_(u"Permission"))
|
||||
|
||||
content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content type"))
|
||||
content_id = models.PositiveIntegerField(verbose_name=_(u"Content id"))
|
||||
content = generic.GenericForeignKey(ct_field="content_type", fk_field="content_id")
|
||||
|
||||
def __unicode__(self):
|
||||
if self.role:
|
||||
principal = self.role
|
||||
else:
|
||||
principal = self.user
|
||||
|
||||
return "%s / %s / %s - %s" % (self.permission.name, principal, self.content_type, self.content_id)
|
||||
|
||||
def get_principal(self):
|
||||
"""Returns the principal.
|
||||
"""
|
||||
return self.user or self.group
|
||||
|
||||
def set_principal(self, principal):
|
||||
"""Sets the principal.
|
||||
"""
|
||||
if isinstance(principal, User):
|
||||
self.user = principal
|
||||
else:
|
||||
self.group = principal
|
||||
|
||||
principal = property(get_principal, set_principal)
|
||||
|
||||
class ObjectPermissionInheritanceBlock(models.Model):
|
||||
"""Blocks the inheritance for specific permission and object.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
permission
|
||||
The permission for which inheritance is blocked.
|
||||
|
||||
content
|
||||
The object for which the inheritance is blocked.
|
||||
"""
|
||||
permission = models.ForeignKey(Permission, verbose_name=_(u"Permission"))
|
||||
|
||||
content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content type"))
|
||||
content_id = models.PositiveIntegerField(verbose_name=_(u"Content id"))
|
||||
content = generic.GenericForeignKey(ct_field="content_type", fk_field="content_id")
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s / %s - %s" % (self.permission, self.content_type, self.content_id)
|
||||
|
||||
class Role(models.Model):
|
||||
"""A role gets permissions to do something. Principals (users and groups)
|
||||
can only get permissions via roles.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
name
|
||||
The unique name of the role
|
||||
"""
|
||||
name = models.CharField(max_length=100, unique=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ("name", )
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
def add_principal(self, principal, content=None):
|
||||
"""Addes the given principal (user or group) ot the Role.
|
||||
"""
|
||||
if isinstance(principal, User):
|
||||
PrincipalRoleRelation.objects.create(user=principal, role=self)
|
||||
else:
|
||||
PrincipalRoleRelation.objects.create(group=principal, role=self)
|
||||
|
||||
def get_groups(self, content=None):
|
||||
"""Returns all groups which has this role assigned. If content is given
|
||||
it returns also the local roles.
|
||||
"""
|
||||
if content:
|
||||
ctype = ContentType.objects.get_for_model(content)
|
||||
prrs = PrincipalRoleRelation.objects.filter(role=self,
|
||||
content_id__in = (None, content.id),
|
||||
content_type__in = (None, ctype)).exclude(group=None)
|
||||
else:
|
||||
prrs = PrincipalRoleRelation.objects.filter(role=self,
|
||||
content_id=None, content_type=None).exclude(group=None)
|
||||
|
||||
return [prr.group for prr in prrs]
|
||||
|
||||
def get_users(self, content=None):
|
||||
"""Returns all users which has this role assigned. If content is given
|
||||
it returns also the local roles.
|
||||
"""
|
||||
if content:
|
||||
ctype = ContentType.objects.get_for_model(content)
|
||||
prrs = PrincipalRoleRelation.objects.filter(role=self,
|
||||
content_id__in = (None, content.id),
|
||||
content_type__in = (None, ctype)).exclude(user=None)
|
||||
else:
|
||||
prrs = PrincipalRoleRelation.objects.filter(role=self,
|
||||
content_id=None, content_type=None).exclude(user=None)
|
||||
|
||||
return [prr.user for prr in prrs]
|
||||
|
||||
class PrincipalRoleRelation(models.Model):
|
||||
"""A role given to a principal (user or group). If a content object is
|
||||
given this is a local role, i.e. the principal has this role only for this
|
||||
content object. Otherwise it is a global role, i.e. the principal has
|
||||
this role generally.
|
||||
|
||||
user
|
||||
A user instance. Either a user xor a group needs to be given.
|
||||
|
||||
group
|
||||
A group instance. Either a user xor a group needs to be given.
|
||||
|
||||
role
|
||||
The role which is given to the principal for content.
|
||||
|
||||
content
|
||||
The content object which gets the local role (optional).
|
||||
"""
|
||||
user = models.ForeignKey(User, verbose_name=_(u"User"), blank=True, null=True)
|
||||
group = models.ForeignKey(Group, verbose_name=_(u"Group"), blank=True, null=True)
|
||||
role = models.ForeignKey(Role, verbose_name=_(u"Role"))
|
||||
|
||||
content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content type"), blank=True, null=True)
|
||||
content_id = models.PositiveIntegerField(verbose_name=_(u"Content id"), blank=True, null=True)
|
||||
content = generic.GenericForeignKey(ct_field="content_type", fk_field="content_id")
|
||||
|
||||
def get_principal(self):
|
||||
"""Returns the principal.
|
||||
"""
|
||||
return self.user or self.group
|
||||
|
||||
def set_principal(self, principal):
|
||||
"""Sets the principal.
|
||||
"""
|
||||
if isinstance(principal, User):
|
||||
self.user = principal
|
||||
else:
|
||||
self.group = principal
|
||||
|
||||
principal = property(get_principal, set_principal)
|
0
permissions/templatetags/__init__.py
Normal file
0
permissions/templatetags/__init__.py
Normal file
48
permissions/templatetags/permissions_tags.py
Normal file
48
permissions/templatetags/permissions_tags.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
# django imports
|
||||
from django import template
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.contrib.auth.models import User, AnonymousUser
|
||||
|
||||
import permissions.utils
|
||||
register = template.Library()
|
||||
|
||||
class PermissionComparisonNode(template.Node):
|
||||
"""Implements a node to provide an if current user has passed permission
|
||||
for current object.
|
||||
"""
|
||||
@classmethod
|
||||
def handle_token(cls, parser, token):
|
||||
bits = token.contents.split()
|
||||
if len(bits) != 2:
|
||||
raise template.TemplateSyntaxError(
|
||||
"'%s' tag takes one argument" % bits[0])
|
||||
end_tag = 'endifhasperm'
|
||||
nodelist_true = parser.parse(('else', end_tag))
|
||||
token = parser.next_token()
|
||||
if token.contents == 'else': # there is an 'else' clause in the tag
|
||||
nodelist_false = parser.parse((end_tag,))
|
||||
parser.delete_first_token()
|
||||
else:
|
||||
nodelist_false = ""
|
||||
|
||||
return cls(bits[1], nodelist_true, nodelist_false)
|
||||
|
||||
def __init__(self, permission, nodelist_true, nodelist_false):
|
||||
self.permission = permission
|
||||
self.nodelist_true = nodelist_true
|
||||
self.nodelist_false = nodelist_false
|
||||
|
||||
def render(self, context):
|
||||
obj = context.get("obj")
|
||||
request = context.get("request")
|
||||
if permissions.utils.has_permission(self.permission, request.user, obj):
|
||||
return self.nodelist_true.render(context)
|
||||
else:
|
||||
return self.nodelist_false
|
||||
|
||||
@register.tag
|
||||
def ifhasperm(parser, token):
|
||||
"""This function provides functionality for the 'ifhasperm' template tag.
|
||||
"""
|
||||
return PermissionComparisonNode.handle_token(parser, token)
|
||||
|
783
permissions/tests.py
Normal file
783
permissions/tests.py
Normal file
|
@ -0,0 +1,783 @@
|
|||
# django imports
|
||||
from django.contrib.flatpages.models import FlatPage
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import User
|
||||
from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test import TestCase
|
||||
from django.test.client import Client
|
||||
|
||||
# permissions imports
|
||||
from permissions.models import Permission
|
||||
from permissions.models import ObjectPermission
|
||||
from permissions.models import ObjectPermissionInheritanceBlock
|
||||
from permissions.models import Role
|
||||
|
||||
import permissions.utils
|
||||
|
||||
class BackendTestCase(TestCase):
|
||||
"""
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
settings.AUTHENTICATION_BACKENDS = (
|
||||
'django.contrib.auth.backends.ModelBackend',
|
||||
'permissions.backend.ObjectPermissionsBackend',
|
||||
)
|
||||
|
||||
self.role_1 = permissions.utils.register_role("Role 1")
|
||||
self.user = User.objects.create(username="john")
|
||||
self.page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
self.view = permissions.utils.register_permission("View", "view")
|
||||
|
||||
# Add user to role
|
||||
self.role_1.add_principal(self.user)
|
||||
|
||||
def test_has_perm(self):
|
||||
"""Tests has perm of the backend.
|
||||
"""
|
||||
result = self.user.has_perm(self.view, self.page_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
# assign view permission to role 1
|
||||
permissions.utils.grant_permission(self.page_1, self.role_1, self.view)
|
||||
|
||||
result = self.user.has_perm("view", self.page_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
class RoleTestCase(TestCase):
|
||||
"""
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
self.role_1 = permissions.utils.register_role("Role 1")
|
||||
self.role_2 = permissions.utils.register_role("Role 2")
|
||||
|
||||
self.user = User.objects.create(username="john")
|
||||
self.group = Group.objects.create(name="brights")
|
||||
|
||||
self.user.groups.add(self.group)
|
||||
|
||||
self.page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
self.page_2 = FlatPage.objects.create(url="/page-1/", title="Page 2")
|
||||
|
||||
def test_getter(self):
|
||||
"""
|
||||
"""
|
||||
result = permissions.utils.get_group(self.group.id)
|
||||
self.assertEqual(result, self.group)
|
||||
|
||||
result = permissions.utils.get_group(42)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
result = permissions.utils.get_role(self.role_1.id)
|
||||
self.assertEqual(result, self.role_1)
|
||||
|
||||
result = permissions.utils.get_role(42)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
result = permissions.utils.get_user(self.user.id)
|
||||
self.assertEqual(result, self.user)
|
||||
|
||||
result = permissions.utils.get_user(42)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_global_roles_user(self):
|
||||
"""
|
||||
"""
|
||||
# Add role 1
|
||||
result = permissions.utils.add_role(self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Add role 1 again
|
||||
result = permissions.utils.add_role(self.user, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_roles(self.user)
|
||||
self.assertEqual(result, [self.role_1])
|
||||
|
||||
# Add role 2
|
||||
result = permissions.utils.add_role(self.user, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.user)
|
||||
self.assertEqual(result, [self.role_1, self.role_2])
|
||||
|
||||
# Remove role 1
|
||||
result = permissions.utils.remove_role(self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Remove role 1 again
|
||||
result = permissions.utils.remove_role(self.user, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_roles(self.user)
|
||||
self.assertEqual(result, [self.role_2])
|
||||
|
||||
# Remove role 2
|
||||
result = permissions.utils.remove_role(self.user, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.user)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_global_roles_group(self):
|
||||
"""
|
||||
"""
|
||||
# Add role 1
|
||||
result = permissions.utils.add_role(self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Add role 1 again
|
||||
result = permissions.utils.add_role(self.group, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_roles(self.group)
|
||||
self.assertEqual(result, [self.role_1])
|
||||
|
||||
# Add role 2
|
||||
result = permissions.utils.add_role(self.group, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.group)
|
||||
self.assertEqual(result, [self.role_1, self.role_2])
|
||||
|
||||
# Remove role 1
|
||||
result = permissions.utils.remove_role(self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Remove role 1 again
|
||||
result = permissions.utils.remove_role(self.group, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_roles(self.group)
|
||||
self.assertEqual(result, [self.role_2])
|
||||
|
||||
# Remove role 2
|
||||
result = permissions.utils.remove_role(self.group, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.group)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_remove_roles_user(self):
|
||||
"""
|
||||
"""
|
||||
# Add role 1
|
||||
result = permissions.utils.add_role(self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Add role 2
|
||||
result = permissions.utils.add_role(self.user, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.user)
|
||||
self.assertEqual(result, [self.role_1, self.role_2])
|
||||
|
||||
# Remove roles
|
||||
result = permissions.utils.remove_roles(self.user)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.user)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
# Remove roles
|
||||
result = permissions.utils.remove_roles(self.user)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_remove_roles_group(self):
|
||||
"""
|
||||
"""
|
||||
# Add role 1
|
||||
result = permissions.utils.add_role(self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Add role 2
|
||||
result = permissions.utils.add_role(self.group, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.group)
|
||||
self.assertEqual(result, [self.role_1, self.role_2])
|
||||
|
||||
# Remove roles
|
||||
result = permissions.utils.remove_roles(self.group)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_roles(self.group)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
# Remove roles
|
||||
result = permissions.utils.remove_roles(self.group)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_local_role_user(self):
|
||||
"""
|
||||
"""
|
||||
# Add local role to page 1
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Again
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, [self.role_1])
|
||||
|
||||
# Add local role 2
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, [self.role_1, self.role_2])
|
||||
|
||||
# Remove role 1
|
||||
result = permissions.utils.remove_local_role(self.page_1, self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Remove role 1 again
|
||||
result = permissions.utils.remove_local_role(self.page_1, self.user, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, [self.role_2])
|
||||
|
||||
# Remove role 2
|
||||
result = permissions.utils.remove_local_role(self.page_1, self.user, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_local_role_group(self):
|
||||
"""
|
||||
"""
|
||||
# Add local role to page 1
|
||||
result = permissions.utils.add_local_role(self.page_1, self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Again
|
||||
result = permissions.utils.add_local_role(self.page_1, self.group, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.group)
|
||||
self.assertEqual(result, [self.role_1])
|
||||
|
||||
# Add local role 2
|
||||
result = permissions.utils.add_local_role(self.page_1, self.group, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.group)
|
||||
self.assertEqual(result, [self.role_1, self.role_2])
|
||||
|
||||
# Remove role 1
|
||||
result = permissions.utils.remove_local_role(self.page_1, self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Remove role 1 again
|
||||
result = permissions.utils.remove_local_role(self.page_1, self.group, self.role_1)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.group)
|
||||
self.assertEqual(result, [self.role_2])
|
||||
|
||||
# Remove role 2
|
||||
result = permissions.utils.remove_local_role(self.page_1, self.group, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.group)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_remove_local_roles_user(self):
|
||||
"""
|
||||
"""
|
||||
# Add local role to page 1
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Add local role 2
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user, self.role_2)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, [self.role_1, self.role_2])
|
||||
|
||||
# Remove all local roles
|
||||
result = permissions.utils.remove_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.get_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
# Remove all local roles again
|
||||
result = permissions.utils.remove_local_roles(self.page_1, self.user)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_get_groups_1(self):
|
||||
"""Tests global roles for groups.
|
||||
"""
|
||||
result = self.role_1.get_groups()
|
||||
self.assertEqual(len(result), 0)
|
||||
|
||||
result = permissions.utils.add_role(self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = self.role_1.get_groups()
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
|
||||
# Add another group
|
||||
self.group_2 = Group.objects.create(name="atheists")
|
||||
result = permissions.utils.add_role(self.group_2, self.role_1)
|
||||
|
||||
result = self.role_1.get_groups()
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
self.assertEqual(result[1].name, "atheists")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
# Add the role to an user
|
||||
result = permissions.utils.add_role(self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# This shouldn't have an effect on the result
|
||||
result = self.role_1.get_groups()
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
self.assertEqual(result[1].name, "atheists")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
def test_get_groups_2(self):
|
||||
"""Tests local roles for groups.
|
||||
"""
|
||||
result = self.role_1.get_groups(self.page_1)
|
||||
self.assertEqual(len(result), 0)
|
||||
|
||||
result = permissions.utils.add_local_role(self.page_1, self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = self.role_1.get_groups(self.page_1)
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
|
||||
# Add another local group
|
||||
self.group_2 = Group.objects.create(name="atheists")
|
||||
result = permissions.utils.add_local_role(self.page_1, self.group_2, self.role_1)
|
||||
|
||||
result = self.role_1.get_groups(self.page_1)
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
self.assertEqual(result[1].name, "atheists")
|
||||
|
||||
# A the global role to group
|
||||
result = permissions.utils.add_role(self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Nontheless there are just two groups returned (and no duplicate)
|
||||
result = self.role_1.get_groups(self.page_1)
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
self.assertEqual(result[1].name, "atheists")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
# Andere there should one global role
|
||||
result = self.role_1.get_groups()
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
|
||||
# Add the role to an user
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# This shouldn't have an effect on the result
|
||||
result = self.role_1.get_groups(self.page_1)
|
||||
self.assertEqual(result[0].name, "brights")
|
||||
self.assertEqual(result[1].name, "atheists")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
def test_get_users_1(self):
|
||||
"""Tests global roles for users.
|
||||
"""
|
||||
result = self.role_1.get_users()
|
||||
self.assertEqual(len(result), 0)
|
||||
|
||||
result = permissions.utils.add_role(self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = self.role_1.get_users()
|
||||
self.assertEqual(result[0].username, "john")
|
||||
|
||||
# Add another role to an user
|
||||
self.user_2 = User.objects.create(username="jane")
|
||||
result = permissions.utils.add_role(self.user_2, self.role_1)
|
||||
|
||||
result = self.role_1.get_users()
|
||||
self.assertEqual(result[0].username, "john")
|
||||
self.assertEqual(result[1].username, "jane")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
# Add the role to an user
|
||||
result = permissions.utils.add_role(self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# This shouldn't have an effect on the result
|
||||
result = self.role_1.get_users()
|
||||
self.assertEqual(result[0].username, "john")
|
||||
self.assertEqual(result[1].username, "jane")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
def test_get_users_2(self):
|
||||
"""Tests local roles for users.
|
||||
"""
|
||||
result = self.role_1.get_users(self.page_1)
|
||||
self.assertEqual(len(result), 0)
|
||||
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = self.role_1.get_users(self.page_1)
|
||||
self.assertEqual(result[0].username, "john")
|
||||
|
||||
# Add another local role to an user
|
||||
self.user_2 = User.objects.create(username="jane")
|
||||
result = permissions.utils.add_local_role(self.page_1, self.user_2, self.role_1)
|
||||
|
||||
result = self.role_1.get_users(self.page_1)
|
||||
self.assertEqual(result[0].username, "john")
|
||||
self.assertEqual(result[1].username, "jane")
|
||||
|
||||
# A the global role to user
|
||||
result = permissions.utils.add_role(self.user, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Nontheless there are just two users returned (and no duplicate)
|
||||
result = self.role_1.get_users(self.page_1)
|
||||
self.assertEqual(result[0].username, "john")
|
||||
self.assertEqual(result[1].username, "jane")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
# Andere there should one user for the global role
|
||||
result = self.role_1.get_users()
|
||||
self.assertEqual(result[0].username, "john")
|
||||
|
||||
# Add the role to an group
|
||||
result = permissions.utils.add_local_role(self.page_1, self.group, self.role_1)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# This shouldn't have an effect on the result
|
||||
result = self.role_1.get_users(self.page_1)
|
||||
self.assertEqual(result[0].username, "john")
|
||||
self.assertEqual(result[1].username, "jane")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
class PermissionTestCase(TestCase):
|
||||
"""
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
self.role_1 = permissions.utils.register_role("Role 1")
|
||||
self.role_2 = permissions.utils.register_role("Role 2")
|
||||
|
||||
self.user = User.objects.create(username="john")
|
||||
permissions.utils.add_role(self.user, self.role_1)
|
||||
self.user.save()
|
||||
|
||||
self.page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
self.page_2 = FlatPage.objects.create(url="/page-1/", title="Page 2")
|
||||
|
||||
self.permission = permissions.utils.register_permission("View", "view")
|
||||
|
||||
def test_add_permissions(self):
|
||||
"""
|
||||
"""
|
||||
# Add per object
|
||||
result = permissions.utils.grant_permission(self.page_1, self.role_1, self.permission)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Add per codename
|
||||
result = permissions.utils.grant_permission(self.page_1, self.role_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Add ermission which does not exist
|
||||
result = permissions.utils.grant_permission(self.page_1, self.role_1, "hurz")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_remove_permission(self):
|
||||
"""
|
||||
"""
|
||||
# Add
|
||||
result = permissions.utils.grant_permission(self.page_1, self.role_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Remove
|
||||
result = permissions.utils.remove_permission(self.page_1, self.role_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Remove again
|
||||
result = permissions.utils.remove_permission(self.page_1, self.role_1, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_has_permission_role(self):
|
||||
"""
|
||||
"""
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.grant_permission(self.page_1, self.role_1, self.permission)
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.remove_permission(self.page_1, self.role_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_has_permission_owner(self):
|
||||
"""
|
||||
"""
|
||||
creator = User.objects.create(username="jane")
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, creator, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
owner = permissions.utils.register_role("Owner")
|
||||
permissions.utils.grant_permission(self.page_1, owner, "view")
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, creator, "view", [owner])
|
||||
self.assertEqual(result, True)
|
||||
|
||||
def test_local_role(self):
|
||||
"""
|
||||
"""
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
permissions.utils.grant_permission(self.page_1, self.role_2, self.permission)
|
||||
permissions.utils.add_local_role(self.page_1, self.user, self.role_2)
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
def test_ineritance(self):
|
||||
"""
|
||||
"""
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# per permission
|
||||
permissions.utils.add_inheritance_block(self.page_1, self.permission)
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
permissions.utils.remove_inheritance_block(self.page_1, self.permission)
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# per codename
|
||||
permissions.utils.add_inheritance_block(self.page_1, "view")
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
permissions.utils.remove_inheritance_block(self.page_1, "view")
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
def test_unicode(self):
|
||||
"""
|
||||
"""
|
||||
# Permission
|
||||
self.assertEqual(self.permission.__unicode__(), "View (view)")
|
||||
|
||||
# ObjectPermission
|
||||
permissions.utils.grant_permission(self.page_1, self.role_1, self.permission)
|
||||
opr = ObjectPermission.objects.get(permission=self.permission, role=self.role_1)
|
||||
self.assertEqual(opr.__unicode__(), "View / Role 1 / flat page - 1")
|
||||
|
||||
# ObjectPermissionInheritanceBlock
|
||||
permissions.utils.add_inheritance_block(self.page_1, self.permission)
|
||||
opb = ObjectPermissionInheritanceBlock.objects.get(permission=self.permission)
|
||||
|
||||
self.assertEqual(opb.__unicode__(), "View (view) / flat page - 1")
|
||||
|
||||
def test_reset(self):
|
||||
"""
|
||||
"""
|
||||
result = permissions.utils.grant_permission(self.page_1, self.role_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
permissions.utils.add_inheritance_block(self.page_1, "view")
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
permissions.utils.reset(self.page_1)
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
permissions.utils.reset(self.page_1)
|
||||
|
||||
class RegistrationTestCase(TestCase):
|
||||
"""Tests the registration of different components.
|
||||
"""
|
||||
def test_group(self):
|
||||
"""Tests registering/unregistering of a group.
|
||||
"""
|
||||
# Register a group
|
||||
result = permissions.utils.register_group("Brights")
|
||||
self.failUnless(isinstance(result, Group))
|
||||
|
||||
# It's there
|
||||
group = Group.objects.get(name="Brights")
|
||||
self.assertEqual(group.name, "Brights")
|
||||
|
||||
# Trying to register another group with same name
|
||||
result = permissions.utils.register_group("Brights")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
group = Group.objects.get(name="Brights")
|
||||
self.assertEqual(group.name, "Brights")
|
||||
|
||||
# Unregister the group
|
||||
result = permissions.utils.unregister_group("Brights")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# It's not there anymore
|
||||
self.assertRaises(Group.DoesNotExist, Group.objects.get, name="Brights")
|
||||
|
||||
# Trying to unregister the group again
|
||||
result = permissions.utils.unregister_group("Brights")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_role(self):
|
||||
"""Tests registering/unregistering of a role.
|
||||
"""
|
||||
# Register a role
|
||||
result = permissions.utils.register_role("Editor")
|
||||
self.failUnless(isinstance(result, Role))
|
||||
|
||||
# It's there
|
||||
role = Role.objects.get(name="Editor")
|
||||
self.assertEqual(role.name, "Editor")
|
||||
|
||||
# Trying to register another role with same name
|
||||
result = permissions.utils.register_role("Editor")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
role = Role.objects.get(name="Editor")
|
||||
self.assertEqual(role.name, "Editor")
|
||||
|
||||
# Unregister the role
|
||||
result = permissions.utils.unregister_role("Editor")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# It's not there anymore
|
||||
self.assertRaises(Role.DoesNotExist, Role.objects.get, name="Editor")
|
||||
|
||||
# Trying to unregister the role again
|
||||
result = permissions.utils.unregister_role("Editor")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_permission(self):
|
||||
"""Tests registering/unregistering of a permission.
|
||||
"""
|
||||
# Register a permission
|
||||
result = permissions.utils.register_permission("Change", "change")
|
||||
self.failUnless(isinstance(result, Permission))
|
||||
|
||||
# Is it there?
|
||||
p = Permission.objects.get(codename="change")
|
||||
self.assertEqual(p.name, "Change")
|
||||
|
||||
# Register a permission with the same codename
|
||||
result = permissions.utils.register_permission("Change2", "change")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
# Is it there?
|
||||
p = Permission.objects.get(codename="change")
|
||||
self.assertEqual(p.name, "Change")
|
||||
|
||||
# Register a permission with the same name
|
||||
result = permissions.utils.register_permission("Change", "change2")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
# Is it there?
|
||||
p = Permission.objects.get(codename="change")
|
||||
self.assertEqual(p.name, "Change")
|
||||
|
||||
# Unregister the permission
|
||||
result = permissions.utils.unregister_permission("change")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Is it not there anymore?
|
||||
self.assertRaises(Permission.DoesNotExist, Permission.objects.get, codename="change")
|
||||
|
||||
# Unregister the permission again
|
||||
result = permissions.utils.unregister_permission("change")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
# django imports
|
||||
from django.core.handlers.wsgi import WSGIRequest
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.sessions.backends.file import SessionStore
|
||||
from django.test.client import Client
|
||||
|
||||
# Taken from "http://www.djangosnippets.org/snippets/963/"
|
||||
class RequestFactory(Client):
|
||||
"""
|
||||
Class that lets you create mock Request objects for use in testing.
|
||||
|
||||
Usage:
|
||||
|
||||
rf = RequestFactory()
|
||||
get_request = rf.get('/hello/')
|
||||
post_request = rf.post('/submit/', {'foo': 'bar'})
|
||||
|
||||
This class re-uses the django.test.client.Client interface, docs here:
|
||||
http://www.djangoproject.com/documentation/testing/#the-test-client
|
||||
|
||||
Once you have a request object you can pass it to any view function,
|
||||
just as if that view had been hooked up using a URLconf.
|
||||
|
||||
"""
|
||||
def request(self, **request):
|
||||
"""
|
||||
Similar to parent class, but returns the request object as soon as it
|
||||
has created it.
|
||||
"""
|
||||
environ = {
|
||||
'HTTP_COOKIE': self.cookies,
|
||||
'PATH_INFO': '/',
|
||||
'QUERY_STRING': '',
|
||||
'REQUEST_METHOD': 'GET',
|
||||
'SCRIPT_NAME': '',
|
||||
'SERVER_NAME': 'testserver',
|
||||
'SERVER_PORT': 80,
|
||||
'SERVER_PROTOCOL': 'HTTP/1.1',
|
||||
}
|
||||
environ.update(self.defaults)
|
||||
environ.update(request)
|
||||
return WSGIRequest(environ)
|
||||
|
||||
def create_request():
|
||||
"""
|
||||
"""
|
||||
rf = RequestFactory()
|
||||
request = rf.get('/')
|
||||
request.session = SessionStore()
|
||||
|
||||
user = User()
|
||||
user.is_superuser = True
|
||||
user.save()
|
||||
request.user = user
|
||||
|
||||
return request
|
665
permissions/utils.py
Normal file
665
permissions/utils.py
Normal file
|
@ -0,0 +1,665 @@
|
|||
# django imports
|
||||
from django.db import IntegrityError
|
||||
from django.db.models import Q
|
||||
from django.db import connection
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.cache import cache
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
|
||||
# permissions imports
|
||||
from permissions.exceptions import Unauthorized
|
||||
from permissions.models import ObjectPermission
|
||||
from permissions.models import ObjectPermissionInheritanceBlock
|
||||
from permissions.models import Permission
|
||||
from permissions.models import PrincipalRoleRelation
|
||||
from permissions.models import Role
|
||||
|
||||
|
||||
# Roles ######################################################################
|
||||
|
||||
def add_role(principal, role):
|
||||
"""Adds a global role to a principal.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
principal
|
||||
The principal (user or group) which gets the role added.
|
||||
|
||||
role
|
||||
The role which is assigned.
|
||||
"""
|
||||
if isinstance(principal, User):
|
||||
try:
|
||||
ppr = PrincipalRoleRelation.objects.get(user=principal, role=role, content_id=None, content_type=None)
|
||||
except PrincipalRoleRelation.DoesNotExist:
|
||||
PrincipalRoleRelation.objects.create(user=principal, role=role)
|
||||
return True
|
||||
else:
|
||||
try:
|
||||
ppr = PrincipalRoleRelation.objects.get(group=principal, role=role, content_id=None, content_type=None)
|
||||
except PrincipalRoleRelation.DoesNotExist:
|
||||
PrincipalRoleRelation.objects.create(group=principal, role=role)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def add_local_role(obj, principal, role):
|
||||
"""Adds a local role to a principal.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object for which the principal gets the role.
|
||||
|
||||
principal
|
||||
The principal (user or group) which gets the role.
|
||||
|
||||
role
|
||||
The role which is assigned.
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
if isinstance(principal, User):
|
||||
try:
|
||||
ppr = PrincipalRoleRelation.objects.get(user=principal, role=role, content_id=obj.id, content_type=ctype)
|
||||
except PrincipalRoleRelation.DoesNotExist:
|
||||
PrincipalRoleRelation.objects.create(user=principal, role=role, content=obj)
|
||||
return True
|
||||
else:
|
||||
try:
|
||||
ppr = PrincipalRoleRelation.objects.get(group=principal, role=role, content_id=obj.id, content_type=ctype)
|
||||
except PrincipalRoleRelation.DoesNotExist:
|
||||
PrincipalRoleRelation.objects.create(group=principal, role=role, content=obj)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def remove_role(principal, role):
|
||||
"""Removes role from passed principal.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
principal
|
||||
The principal (user or group) from which the role is removed.
|
||||
|
||||
role
|
||||
The role which is removed.
|
||||
"""
|
||||
try:
|
||||
if isinstance(principal, User):
|
||||
ppr = PrincipalRoleRelation.objects.get(
|
||||
user=principal, role=role, content_id=None, content_type=None)
|
||||
else:
|
||||
ppr = PrincipalRoleRelation.objects.get(
|
||||
group=principal, role=role, content_id=None, content_type=None)
|
||||
|
||||
except PrincipalRoleRelation.DoesNotExist:
|
||||
return False
|
||||
else:
|
||||
ppr.delete()
|
||||
|
||||
return True
|
||||
|
||||
def remove_local_role(obj, principal, role):
|
||||
"""Removes role from passed object and principle.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object from which the role is removed.
|
||||
|
||||
principal
|
||||
The principal (user or group) from which the role is removed.
|
||||
|
||||
role
|
||||
The role which is removed.
|
||||
"""
|
||||
try:
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
|
||||
if isinstance(principal, User):
|
||||
ppr = PrincipalRoleRelation.objects.get(
|
||||
user=principal, role=role, content_id=obj.id, content_type=ctype)
|
||||
else:
|
||||
ppr = PrincipalRoleRelation.objects.get(
|
||||
group=principal, role=role, content_id=obj.id, content_type=ctype)
|
||||
|
||||
except PrincipalRoleRelation.DoesNotExist:
|
||||
return False
|
||||
else:
|
||||
ppr.delete()
|
||||
|
||||
return True
|
||||
|
||||
def remove_roles(principal):
|
||||
"""Removes all roles passed principal (user or group).
|
||||
|
||||
**Parameters:**
|
||||
|
||||
principal
|
||||
The principal (user or group) from which all roles are removed.
|
||||
"""
|
||||
if isinstance(principal, User):
|
||||
ppr = PrincipalRoleRelation.objects.filter(
|
||||
user=principal, content_id=None, content_type=None)
|
||||
else:
|
||||
ppr = PrincipalRoleRelation.objects.filter(
|
||||
group=principal, content_id=None, content_type=None)
|
||||
|
||||
if ppr:
|
||||
ppr.delete()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def remove_local_roles(obj, principal):
|
||||
"""Removes all local roles from passed object and principal (user or
|
||||
group).
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object from which the roles are removed.
|
||||
|
||||
principal
|
||||
The principal (user or group) from which the roles are removed.
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
|
||||
if isinstance(principal, User):
|
||||
ppr = PrincipalRoleRelation.objects.filter(
|
||||
user=principal, content_id=obj.id, content_type=ctype)
|
||||
else:
|
||||
ppr = PrincipalRoleRelation.objects.filter(
|
||||
group=principal, content_id=obj.id, content_type=ctype)
|
||||
|
||||
if ppr:
|
||||
ppr.delete()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_roles(user, obj=None):
|
||||
"""Returns *all* roles of the passed user.
|
||||
|
||||
This takes direct roles and roles via the user's groups into account.
|
||||
|
||||
If an object is passed local roles will also added. Then all local roles
|
||||
from all ancestors and all user's groups are also taken into account.
|
||||
|
||||
This is the method to use if one want to know whether the passed user
|
||||
has a role in general (for the passed object).
|
||||
|
||||
**Parameters:**
|
||||
|
||||
user
|
||||
The user for which the roles are returned.
|
||||
|
||||
obj
|
||||
The object for which local roles will returned.
|
||||
|
||||
"""
|
||||
roles = []
|
||||
groups = user.groups.all()
|
||||
groups_ids_str = ", ".join([str(g.id) for g in groups])
|
||||
|
||||
# Gobal roles for user and the user's groups
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("""SELECT role_id
|
||||
FROM permissions_principalrolerelation
|
||||
WHERE (user_id=%s OR group_id IN (%s))
|
||||
AND content_id is Null""" % (user.id, groups_ids_str))
|
||||
|
||||
for row in cursor.fetchall():
|
||||
roles.append(row[0])
|
||||
|
||||
# Local roles for user and the user's groups and all ancestors of the
|
||||
# passed object.
|
||||
while obj:
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
cursor.execute("""SELECT role_id
|
||||
FROM permissions_principalrolerelation
|
||||
WHERE (user_id='%s' OR group_id IN (%s))
|
||||
AND content_id='%s'
|
||||
AND content_type_id='%s'""" % (user.id, groups_ids_str, obj.id, ctype.id))
|
||||
|
||||
for row in cursor.fetchall():
|
||||
roles.append(row[0])
|
||||
|
||||
try:
|
||||
obj = obj.get_parent_for_permissions()
|
||||
except AttributeError:
|
||||
obj = None
|
||||
|
||||
return roles
|
||||
|
||||
def get_global_roles(principal):
|
||||
"""Returns *direct* global roles of passed principal (user or group).
|
||||
"""
|
||||
if isinstance(principal, User):
|
||||
return [prr.role for prr in PrincipalRoleRelation.objects.filter(
|
||||
user=principal, content_id=None, content_type=None)]
|
||||
else:
|
||||
if isinstance(principal, Group):
|
||||
principal = (principal,)
|
||||
return [prr.role for prr in PrincipalRoleRelation.objects.filter(
|
||||
group__in=principal, content_id=None, content_type=None)]
|
||||
|
||||
def get_local_roles(obj, principal):
|
||||
"""Returns *direct* local roles for passed principal and content object.
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
|
||||
if isinstance(principal, User):
|
||||
return [prr.role for prr in PrincipalRoleRelation.objects.filter(
|
||||
user=principal, content_id=obj.id, content_type=ctype)]
|
||||
else:
|
||||
return [prr.role for prr in PrincipalRoleRelation.objects.filter(
|
||||
group=principal, content_id=obj.id, content_type=ctype)]
|
||||
|
||||
# Permissions ################################################################
|
||||
|
||||
def check_permission(obj, user, codename, roles=None):
|
||||
"""Checks whether passed user has passed permission for passed obj.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object for which the permission should be checked.
|
||||
|
||||
codename
|
||||
The permission's codename which should be checked.
|
||||
|
||||
user
|
||||
The user for which the permission should be checked.
|
||||
|
||||
roles
|
||||
If given these roles will be assigned to the user temporarily before
|
||||
the permissions are checked.
|
||||
"""
|
||||
if not has_permission(obj, user, codename):
|
||||
raise Unauthorized("User '%s' doesn't have permission '%s' for object '%s' (%s)"
|
||||
% (user, codename, obj.slug, obj.__class__.__name__))
|
||||
|
||||
def grant_permission(obj, role, permission):
|
||||
"""Grants passed permission to passed role. Returns True if the permission
|
||||
was able to be added, otherwise False.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The content object for which the permission should be granted.
|
||||
|
||||
role
|
||||
The role for which the permission should be granted.
|
||||
|
||||
permission
|
||||
The permission which should be granted. Either a permission
|
||||
object or the codename of a permission.
|
||||
"""
|
||||
if not isinstance(permission, Permission):
|
||||
try:
|
||||
permission = Permission.objects.get(codename = permission)
|
||||
except Permission.DoesNotExist:
|
||||
return False
|
||||
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
try:
|
||||
ObjectPermission.objects.get(role=role, content_type = ct, content_id=obj.id, permission=permission)
|
||||
except ObjectPermission.DoesNotExist:
|
||||
ObjectPermission.objects.create(role=role, content=obj, permission=permission)
|
||||
|
||||
return True
|
||||
|
||||
def remove_permission(obj, role, permission):
|
||||
"""Removes passed permission from passed role and object. Returns True if
|
||||
the permission has been removed.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The content object for which a permission should be removed.
|
||||
|
||||
role
|
||||
The role for which a permission should be removed.
|
||||
|
||||
permission
|
||||
The permission which should be removed. Either a permission object
|
||||
or the codename of a permission.
|
||||
"""
|
||||
if not isinstance(permission, Permission):
|
||||
try:
|
||||
permission = Permission.objects.get(codename = permission)
|
||||
except Permission.DoesNotExist:
|
||||
return False
|
||||
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
|
||||
try:
|
||||
op = ObjectPermission.objects.get(role=role, content_type = ct, content_id=obj.id, permission = permission)
|
||||
except ObjectPermission.DoesNotExist:
|
||||
return False
|
||||
|
||||
op.delete()
|
||||
return True
|
||||
|
||||
def has_permission(obj, user, codename, roles=None):
|
||||
"""Checks whether the passed user has passed permission for passed object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object for which the permission should be checked.
|
||||
|
||||
codename
|
||||
The permission's codename which should be checked.
|
||||
|
||||
request
|
||||
The current request.
|
||||
|
||||
roles
|
||||
If given these roles will be assigned to the user temporarily before
|
||||
the permissions are checked.
|
||||
"""
|
||||
cache_key = "%s-%s-%s" % (obj.content_type, obj.id, codename)
|
||||
result = _get_cached_permission(user, cache_key)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
if roles is None:
|
||||
roles = []
|
||||
|
||||
if user.is_superuser:
|
||||
return True
|
||||
|
||||
if not user.is_anonymous():
|
||||
roles.extend(get_roles(user, obj))
|
||||
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
|
||||
result = False
|
||||
while obj is not None:
|
||||
p = ObjectPermission.objects.filter(
|
||||
content_type=ct, content_id=obj.id, role__in=roles, permission__codename = codename).values("id")
|
||||
|
||||
if len(p) > 0:
|
||||
result = True
|
||||
break
|
||||
|
||||
if is_inherited(obj, codename) == False:
|
||||
result = False
|
||||
break
|
||||
|
||||
try:
|
||||
obj = obj.get_parent_for_permissions()
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
except AttributeError:
|
||||
result = False
|
||||
break
|
||||
|
||||
_cache_permission(user, cache_key, result)
|
||||
return result
|
||||
|
||||
# Inheritance ################################################################
|
||||
|
||||
def add_inheritance_block(obj, permission):
|
||||
"""Adds an inheritance for the passed permission on the passed obj.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
permission
|
||||
The permission for which an inheritance block should be added.
|
||||
Either a permission object or the codename of a permission.
|
||||
obj
|
||||
The content object for which an inheritance block should be added.
|
||||
"""
|
||||
if not isinstance(permission, Permission):
|
||||
try:
|
||||
permission = Permission.objects.get(codename = permission)
|
||||
except Permission.DoesNotExist:
|
||||
return False
|
||||
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
try:
|
||||
ObjectPermissionInheritanceBlock.objects.get(content_type = ct, content_id=obj.id, permission=permission)
|
||||
except ObjectPermissionInheritanceBlock.DoesNotExist:
|
||||
try:
|
||||
result = ObjectPermissionInheritanceBlock.objects.create(content=obj, permission=permission)
|
||||
except IntegrityError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def remove_inheritance_block(obj, permission):
|
||||
"""Removes a inheritance block for the passed permission from the passed
|
||||
object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The content object for which an inheritance block should be added.
|
||||
|
||||
permission
|
||||
The permission for which an inheritance block should be removed.
|
||||
Either a permission object or the codename of a permission.
|
||||
"""
|
||||
if not isinstance(permission, Permission):
|
||||
try:
|
||||
permission = Permission.objects.get(codename = permission)
|
||||
except Permission.DoesNotExist:
|
||||
return False
|
||||
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
try:
|
||||
opi = ObjectPermissionInheritanceBlock.objects.get(content_type = ct, content_id=obj.id, permission=permission)
|
||||
except ObjectPermissionInheritanceBlock.DoesNotExist:
|
||||
return False
|
||||
|
||||
opi.delete()
|
||||
return True
|
||||
|
||||
def is_inherited(obj, codename):
|
||||
"""Returns True if the passed permission is inherited for passed object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The content object for which the permission should be checked.
|
||||
|
||||
codename
|
||||
The permission which should be checked. Must be the codename of
|
||||
the permission.
|
||||
"""
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
try:
|
||||
ObjectPermissionInheritanceBlock.objects.get(
|
||||
content_type=ct, content_id=obj.id, permission__codename = codename)
|
||||
except ObjectDoesNotExist:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_group(id):
|
||||
"""Returns the group with passed id or None.
|
||||
"""
|
||||
try:
|
||||
return Group.objects.get(pk=id)
|
||||
except Group.DoesNotExist:
|
||||
return None
|
||||
|
||||
def get_role(id):
|
||||
"""Returns the role with passed id or None.
|
||||
"""
|
||||
try:
|
||||
return Role.objects.get(pk=id)
|
||||
except Role.DoesNotExist:
|
||||
return None
|
||||
|
||||
def get_user(id):
|
||||
"""Returns the user with passed id or None.
|
||||
"""
|
||||
try:
|
||||
return User.objects.get(pk=id)
|
||||
except User.DoesNotExist:
|
||||
return None
|
||||
|
||||
def has_group(user, group):
|
||||
"""Returns True if passed user has passed group.
|
||||
"""
|
||||
if isinstance(group, str):
|
||||
group = Group.objects.get(name=group)
|
||||
|
||||
return group in user.groups.all()
|
||||
|
||||
def reset(obj):
|
||||
"""Resets all permissions and inheritance blocks of passed object.
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
ObjectPermissionInheritanceBlock.objects.filter(content_id=obj.id, content_type=ctype).delete()
|
||||
ObjectPermission.objects.filter(content_id=obj.id, content_type=ctype).delete()
|
||||
|
||||
# Registering ################################################################
|
||||
|
||||
def register_permission(name, codename, ctypes=[]):
|
||||
"""Registers a permission to the framework. Returns the permission if the
|
||||
registration was successfully, otherwise False.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
name
|
||||
The unique name of the permission. This is displayed to the
|
||||
customer.
|
||||
codename
|
||||
The unique codename of the permission. This is used internally to
|
||||
identify the permission.
|
||||
content_types
|
||||
The content type for which the permission is active. This can be
|
||||
used to display only reasonable permissions for an object. This
|
||||
must be a Django ContentType
|
||||
"""
|
||||
try:
|
||||
p = Permission.objects.create(name=name, codename=codename)
|
||||
|
||||
ctypes = [ContentType.objects.get_for_model(ctype) for ctype in ctypes]
|
||||
if ctypes:
|
||||
p.content_types = ctypes
|
||||
p.save()
|
||||
except IntegrityError:
|
||||
return False
|
||||
return p
|
||||
|
||||
def unregister_permission(codename):
|
||||
"""Unregisters a permission from the framework
|
||||
|
||||
**Parameters:**
|
||||
|
||||
codename
|
||||
The unique codename of the permission.
|
||||
"""
|
||||
try:
|
||||
permission = Permission.objects.get(codename=codename)
|
||||
except Permission.DoesNotExist:
|
||||
return False
|
||||
permission.delete()
|
||||
return True
|
||||
|
||||
def register_role(name):
|
||||
"""Registers a role with passed name to the framework. Returns the new
|
||||
role if the registration was successfully, otherwise False.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
name
|
||||
The unique role name.
|
||||
"""
|
||||
try:
|
||||
role = Role.objects.create(name=name)
|
||||
except IntegrityError:
|
||||
return False
|
||||
return role
|
||||
|
||||
def unregister_role(name):
|
||||
"""Unregisters the role with passed name.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
name
|
||||
The unique role name.
|
||||
"""
|
||||
try:
|
||||
role = Role.objects.get(name=name)
|
||||
except Role.DoesNotExist:
|
||||
return False
|
||||
|
||||
role.delete()
|
||||
return True
|
||||
|
||||
def register_group(name):
|
||||
"""Registers a group with passed name to the framework. Returns the new
|
||||
group if the registration was successfully, otherwise False.
|
||||
|
||||
Actually this creates just a default Django Group.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
name
|
||||
The unique group name.
|
||||
"""
|
||||
try:
|
||||
group = Group.objects.create(name=name)
|
||||
except IntegrityError:
|
||||
return False
|
||||
return group
|
||||
|
||||
def unregister_group(name):
|
||||
"""Unregisters the group with passed name. Returns True if the
|
||||
unregistration was succesfull otherwise False.
|
||||
|
||||
Actually this deletes just a default Django Group.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
name
|
||||
The unique role name.
|
||||
"""
|
||||
try:
|
||||
group = Group.objects.get(name=name)
|
||||
except Group.DoesNotExist:
|
||||
return False
|
||||
|
||||
group.delete()
|
||||
return True
|
||||
|
||||
def _cache_permission(user, cache_key, data):
|
||||
"""Stores the passed data on the passed user object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
user
|
||||
The user on which the data is stored.
|
||||
|
||||
cache_key
|
||||
The key under which the data is stored.
|
||||
|
||||
data
|
||||
The data which is stored.
|
||||
"""
|
||||
if not getattr(user, "permissions", None):
|
||||
user.permissions = {}
|
||||
user.permissions[cache_key] = data
|
||||
|
||||
def _get_cached_permission(user, cache_key):
|
||||
"""Returns the stored data from passed user object for passed cache_key.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
user
|
||||
The user from which the data is retrieved.
|
||||
|
||||
cache_key
|
||||
The key under which the data is stored.
|
||||
|
||||
"""
|
||||
permissions = getattr(user, "permissions", None)
|
||||
if permissions:
|
||||
return user.permissions.get(cache_key, None)
|
61
workflows/__init__.py
Normal file
61
workflows/__init__.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
# workflows imports
|
||||
import workflows.utils
|
||||
|
||||
class WorkflowBase(object):
|
||||
"""Mixin class to make objects workflow aware.
|
||||
"""
|
||||
def get_workflow(self):
|
||||
"""Returns the current workflow of the object.
|
||||
"""
|
||||
return workflows.utils.get_workflow(self)
|
||||
|
||||
def remove_workflow(self):
|
||||
"""Removes the workflow from the object. After this function has been
|
||||
called the object has no *own* workflow anymore (it might have one via
|
||||
its content type).
|
||||
|
||||
"""
|
||||
return workflows.utils.remove_workflow_from_object(self)
|
||||
|
||||
def set_workflow(self, workflow):
|
||||
"""Sets the passed workflow to the object. This will set the local
|
||||
workflow for the object.
|
||||
|
||||
If the object has already the given workflow nothing happens.
|
||||
Otherwise the object gets the passed workflow and the state is set to
|
||||
the workflow's initial state.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
workflow
|
||||
The workflow which should be set to the object. Can be a Workflow
|
||||
instance or a string with the workflow name.
|
||||
obj
|
||||
The object which gets the passed workflow.
|
||||
"""
|
||||
return workflows.utils.set_workflow_for_object(workflow)
|
||||
|
||||
def get_state(self):
|
||||
"""Returns the current workflow state of the object.
|
||||
"""
|
||||
return workflows.utils.get_state(self)
|
||||
|
||||
def set_state(self, state):
|
||||
"""Sets the workflow state of the object.
|
||||
"""
|
||||
return workflows.utils.set_state(self, state)
|
||||
|
||||
def set_initial_state(self):
|
||||
"""Sets the initial state of the current workflow to the object.
|
||||
"""
|
||||
return self.set_state(self.get_workflow().initial_state)
|
||||
|
||||
def get_allowed_transitions(self, user):
|
||||
"""Returns allowed transitions for the current state.
|
||||
"""
|
||||
return workflows.utils.get_allowed_transitions(self, user)
|
||||
|
||||
def do_transition(self, transition, user):
|
||||
"""Processes the passed transition (if allowed).
|
||||
"""
|
||||
return workflows.utils.do_transition(self, transition, user)
|
30
workflows/admin.py
Normal file
30
workflows/admin.py
Normal file
|
@ -0,0 +1,30 @@
|
|||
from django.contrib import admin
|
||||
from workflows.models import State
|
||||
from workflows.models import StateInheritanceBlock
|
||||
from workflows.models import StatePermissionRelation
|
||||
from workflows.models import StateObjectRelation
|
||||
from workflows.models import Transition
|
||||
from workflows.models import Workflow
|
||||
from workflows.models import WorkflowObjectRelation
|
||||
from workflows.models import WorkflowModelRelation
|
||||
from workflows.models import WorkflowPermissionRelation
|
||||
|
||||
class StateInline(admin.TabularInline):
|
||||
model = State
|
||||
|
||||
class WorkflowAdmin(admin.ModelAdmin):
|
||||
inlines = [
|
||||
StateInline,
|
||||
]
|
||||
|
||||
admin.site.register(Workflow, WorkflowAdmin)
|
||||
|
||||
admin.site.register(State)
|
||||
admin.site.register(StateInheritanceBlock)
|
||||
admin.site.register(StateObjectRelation)
|
||||
admin.site.register(StatePermissionRelation)
|
||||
admin.site.register(Transition)
|
||||
admin.site.register(WorkflowObjectRelation)
|
||||
admin.site.register(WorkflowModelRelation)
|
||||
admin.site.register(WorkflowPermissionRelation)
|
||||
|
BIN
workflows/locale/de/LC_MESSAGES/django.mo
Normal file
BIN
workflows/locale/de/LC_MESSAGES/django.mo
Normal file
Binary file not shown.
60
workflows/locale/de/LC_MESSAGES/django.po
Normal file
60
workflows/locale/de/LC_MESSAGES/django.po
Normal file
|
@ -0,0 +1,60 @@
|
|||
# German translations for django-workflows
|
||||
# Copyright (C) 2010 Kai Diefenbach
|
||||
# This file is distributed under the same license as the PACKAGE package.
|
||||
# Kai Diefenbach <kai.diefenbach@iqpp.de>, 2010.
|
||||
#
|
||||
msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: 1.0\n"
|
||||
"Report-Msgid-Bugs-To: \n"
|
||||
"POT-Creation-Date: 2010-04-02 09:16+0200\n"
|
||||
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
|
||||
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
|
||||
"Language-Team: LANGUAGE <LL@li.org>\n"
|
||||
"MIME-Version: 1.0\n"
|
||||
"Content-Type: text/plain; charset=UTF-8\n"
|
||||
"Content-Transfer-Encoding: 8bit\n"
|
||||
|
||||
#: models.py:98 models.py:199 models.py:237
|
||||
msgid "Name"
|
||||
msgstr "Name"
|
||||
|
||||
#: models.py:200 models.py:238 models.py:285 models.py:307
|
||||
msgid "Workflow"
|
||||
msgstr "Arbeitsablauf"
|
||||
|
||||
#: models.py:201
|
||||
msgid "Transitions"
|
||||
msgstr "Übergänge"
|
||||
|
||||
#: models.py:239
|
||||
msgid "Destination"
|
||||
msgstr "Ziel"
|
||||
|
||||
#: models.py:240
|
||||
msgid "Condition"
|
||||
msgstr "Kondition"
|
||||
|
||||
#: models.py:241 models.py:350 models.py:373
|
||||
msgid "Permission"
|
||||
msgstr "Recht"
|
||||
|
||||
#: models.py:258 models.py:282
|
||||
msgid "Content type"
|
||||
msgstr "Inhaltstyp"
|
||||
|
||||
#: models.py:259 models.py:283
|
||||
msgid "Content id"
|
||||
msgstr "Inhalts-ID"
|
||||
|
||||
#: models.py:261 models.py:349 models.py:372
|
||||
msgid "State"
|
||||
msgstr "Status"
|
||||
|
||||
#: models.py:306
|
||||
msgid "Content Type"
|
||||
msgstr "Inhaltstyp"
|
||||
|
||||
#: models.py:374
|
||||
msgid "Role"
|
||||
msgstr "Rolle"
|
357
workflows/models.py
Normal file
357
workflows/models.py
Normal file
|
@ -0,0 +1,357 @@
|
|||
from django.db import models
|
||||
|
||||
# django imports
|
||||
from django.contrib.contenttypes import generic
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
# permissions imports
|
||||
import permissions.utils
|
||||
from permissions.models import Permission
|
||||
from permissions.models import Role
|
||||
|
||||
class Workflow(models.Model):
|
||||
"""A workflow consists of a sequence of connected (through transitions)
|
||||
states. It can be assigned to a model and / or model instances. If a
|
||||
model instance has a workflow it takes precendence over the model's
|
||||
workflow.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
model
|
||||
The model the workflow belongs to. Can be any
|
||||
|
||||
content
|
||||
The object the workflow belongs to.
|
||||
|
||||
name
|
||||
The unique name of the workflow.
|
||||
|
||||
states
|
||||
The states of the workflow.
|
||||
|
||||
initial_state
|
||||
The initial state the model / content gets if created.
|
||||
|
||||
"""
|
||||
name = models.CharField(_(u"Name"), max_length=100, unique=True)
|
||||
initial_state = models.ForeignKey("State", related_name="workflow_state", blank=True, null=True)
|
||||
permissions = models.ManyToManyField(Permission, symmetrical=False, through="WorkflowPermissionRelation")
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
def get_initial_state(self):
|
||||
"""Returns the initial state of the workflow. Takes the first one if
|
||||
no state has been defined.
|
||||
"""
|
||||
if self.initial_state:
|
||||
return self.initial_state
|
||||
else:
|
||||
try:
|
||||
return self.states.all()[0]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
def get_objects(self):
|
||||
"""Returns all objects which have this workflow assigned. Globally
|
||||
(via the object's content type) or locally (via the object itself).
|
||||
"""
|
||||
import workflows.utils
|
||||
objs = []
|
||||
|
||||
# Get all objects whose content type has this workflow
|
||||
for wmr in WorkflowModelRelation.objects.filter(workflow=self):
|
||||
ctype = wmr.content_type
|
||||
# We have also to check whether the global workflow is not
|
||||
# overwritten.
|
||||
for obj in ctype.model_class().objects.all():
|
||||
if workflows.utils.get_workflow(obj) == self:
|
||||
objs.append(obj)
|
||||
|
||||
# Get all objects whose local workflow this workflow
|
||||
for wor in WorkflowObjectRelation.objects.filter(workflow=self):
|
||||
if wor.content not in objs:
|
||||
objs.append(wor.content)
|
||||
|
||||
return objs
|
||||
|
||||
def set_to(self, ctype_or_obj):
|
||||
"""Sets the workflow to passed content type or object. See the specific
|
||||
methods for more information.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
ctype_or_obj
|
||||
The content type or the object to which the workflow should be set.
|
||||
Can be either a ContentType instance or any Django model instance.
|
||||
"""
|
||||
if isinstance(ctype_or_obj, ContentType):
|
||||
return self.set_to_model(ctype_or_obj)
|
||||
else:
|
||||
return self.set_to_object(ctype_or_obj)
|
||||
|
||||
def set_to_model(self, ctype):
|
||||
"""Sets the workflow to the passed content type. If the content
|
||||
type has already an assigned workflow the workflow is overwritten.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
ctype
|
||||
The content type which gets the workflow. Can be any Django model
|
||||
instance.
|
||||
"""
|
||||
try:
|
||||
wor = WorkflowModelRelation.objects.get(content_type=ctype)
|
||||
except WorkflowModelRelation.DoesNotExist:
|
||||
WorkflowModelRelation.objects.create(content_type=ctype, workflow=self)
|
||||
else:
|
||||
wor.workflow = self
|
||||
wor.save()
|
||||
|
||||
def set_to_object(self, obj):
|
||||
"""Sets the workflow to the passed object.
|
||||
|
||||
If the object has already the given workflow nothing happens. Otherwise
|
||||
the workflow is set to the objectthe state is set to the workflow's
|
||||
initial state.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object which gets the workflow.
|
||||
"""
|
||||
import workflows.utils
|
||||
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
try:
|
||||
wor = WorkflowObjectRelation.objects.get(content_type=ctype, content_id=obj.id)
|
||||
except WorkflowObjectRelation.DoesNotExist:
|
||||
WorkflowObjectRelation.objects.create(content = obj, workflow=self)
|
||||
workflows.utils.set_state(obj, self.initial_state)
|
||||
else:
|
||||
if wor.workflow != self:
|
||||
wor.workflow = self
|
||||
wor.save()
|
||||
workflows.utils.set_state(self.initial_state)
|
||||
|
||||
class State(models.Model):
|
||||
"""A certain state within workflow.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
name
|
||||
The unique name of the state within the workflow.
|
||||
|
||||
workflow
|
||||
The workflow to which the state belongs.
|
||||
|
||||
transitions
|
||||
The transitions of a workflow state.
|
||||
|
||||
"""
|
||||
name = models.CharField(_(u"Name"), max_length=100)
|
||||
workflow = models.ForeignKey(Workflow, verbose_name=_(u"Workflow"), related_name="states")
|
||||
transitions = models.ManyToManyField("Transition", verbose_name=_(u"Transitions"), blank=True, null=True, related_name="states")
|
||||
|
||||
class Meta:
|
||||
ordering = ("name", )
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s (%s)" % (self.name, self.workflow.name)
|
||||
|
||||
def get_allowed_transitions(self, obj, user):
|
||||
"""Returns all allowed transitions for passed object and user.
|
||||
"""
|
||||
transitions = []
|
||||
for transition in self.transitions.all():
|
||||
permission = transition.permission
|
||||
if permission is None:
|
||||
transitions.append(transition)
|
||||
else:
|
||||
# First we try to get the objects specific has_permission
|
||||
# method (in case the object inherits from the PermissionBase
|
||||
# class).
|
||||
try:
|
||||
if obj.has_permission(user, permission.codename):
|
||||
transitions.append(transition)
|
||||
except AttributeError:
|
||||
if permissions.utils.has_permission(obj, user, permission.codename):
|
||||
transitions.append(transition)
|
||||
return transitions
|
||||
|
||||
class Transition(models.Model):
|
||||
"""A transition from a source to a destination state. The transition can
|
||||
be used from several source states.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
name
|
||||
The unique name of the transition within a workflow.
|
||||
|
||||
workflow
|
||||
The workflow to which the transition belongs. Must be a Workflow
|
||||
instance.
|
||||
|
||||
destination
|
||||
The state after a transition has been processed. Must be a State
|
||||
instance.
|
||||
|
||||
condition
|
||||
The condition when the transition is available. Can be any python
|
||||
expression.
|
||||
|
||||
permission
|
||||
The necessary permission to process the transition. Must be a
|
||||
Permission instance.
|
||||
|
||||
"""
|
||||
name = models.CharField(_(u"Name"), max_length=100)
|
||||
workflow = models.ForeignKey(Workflow, verbose_name=_(u"Workflow"), related_name="transitions")
|
||||
destination = models.ForeignKey(State, verbose_name=_(u"Destination"), null=True, blank=True, related_name="destination_state")
|
||||
condition = models.CharField(_(u"Condition"), blank=True, max_length=100)
|
||||
permission = models.ForeignKey(Permission, verbose_name=_(u"Permission"), blank=True, null=True)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
class StateObjectRelation(models.Model):
|
||||
"""Stores the workflow state of an object.
|
||||
|
||||
Provides a way to give any object a workflow state without changing the
|
||||
object's model.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
content
|
||||
The object for which the state is stored. This can be any instance of
|
||||
a Django model.
|
||||
|
||||
state
|
||||
The state of content. This must be a State instance.
|
||||
"""
|
||||
content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content type"), related_name="state_object", blank=True, null=True)
|
||||
content_id = models.PositiveIntegerField(_(u"Content id"), blank=True, null=True)
|
||||
content = generic.GenericForeignKey(ct_field="content_type", fk_field="content_id")
|
||||
state = models.ForeignKey(State, verbose_name = _(u"State"))
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s %s - %s" % (self.content_type.name, self.content_id, self.state.name)
|
||||
|
||||
class Meta:
|
||||
unique_together = ("content_type", "content_id", "state")
|
||||
|
||||
class WorkflowObjectRelation(models.Model):
|
||||
"""Stores an workflow of an object.
|
||||
|
||||
Provides a way to give any object a workflow without changing the object's
|
||||
model.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
content
|
||||
The object for which the workflow is stored. This can be any instance of
|
||||
a Django model.
|
||||
|
||||
workflow
|
||||
The workflow which is assigned to an object. This needs to be a workflow
|
||||
instance.
|
||||
"""
|
||||
content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content type"), related_name="workflow_object", blank=True, null=True)
|
||||
content_id = models.PositiveIntegerField(_(u"Content id"), blank=True, null=True)
|
||||
content = generic.GenericForeignKey(ct_field="content_type", fk_field="content_id")
|
||||
workflow = models.ForeignKey(Workflow, verbose_name=_(u"Workflow"), related_name="wors")
|
||||
|
||||
class Meta:
|
||||
unique_together = ("content_type", "content_id")
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s %s - %s" % (self.content_type.name, self.content_id, self.workflow.name)
|
||||
|
||||
class WorkflowModelRelation(models.Model):
|
||||
"""Stores an workflow for a model (ContentType).
|
||||
|
||||
Provides a way to give any object a workflow without changing the model.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
Content Type
|
||||
The content type for which the workflow is stored. This can be any
|
||||
instance of a Django model.
|
||||
|
||||
workflow
|
||||
The workflow which is assigned to an object. This needs to be a
|
||||
workflow instance.
|
||||
"""
|
||||
content_type = models.ForeignKey(ContentType, verbose_name=_(u"Content Type"), unique=True)
|
||||
workflow = models.ForeignKey(Workflow, verbose_name=_(u"Workflow"), related_name="wmrs")
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s - %s" % (self.content_type.name, self.workflow.name)
|
||||
|
||||
# Permissions relation #######################################################
|
||||
|
||||
class WorkflowPermissionRelation(models.Model):
|
||||
"""Stores the permissions for which a workflow is responsible.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
workflow
|
||||
The workflow which is responsible for the permissions. Needs to be a
|
||||
Workflow instance.
|
||||
|
||||
permission
|
||||
The permission for which the workflow is responsible. Needs to be a
|
||||
Permission instance.
|
||||
"""
|
||||
workflow = models.ForeignKey(Workflow)
|
||||
permission = models.ForeignKey(Permission, related_name="permissions")
|
||||
|
||||
class Meta:
|
||||
unique_together = ("workflow", "permission")
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s %s" % (self.workflow.name, self.permission.name)
|
||||
|
||||
class StateInheritanceBlock(models.Model):
|
||||
"""Stores inheritance block for state and permission.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
state
|
||||
The state for which the inheritance is blocked. Needs to be a State
|
||||
instance.
|
||||
|
||||
permission
|
||||
The permission for which the instance is blocked. Needs to be a
|
||||
Permission instance.
|
||||
"""
|
||||
state = models.ForeignKey(State, verbose_name=_(u"State"))
|
||||
permission = models.ForeignKey(Permission, verbose_name=_(u"Permission"))
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s %s" % (self.state.name, self.permission.name)
|
||||
|
||||
class StatePermissionRelation(models.Model):
|
||||
"""Stores granted permission for state and role.
|
||||
|
||||
**Attributes:**
|
||||
|
||||
state
|
||||
The state for which the role has the permission. Needs to be a State
|
||||
instance.
|
||||
|
||||
permission
|
||||
The permission for which the workflow is responsible. Needs to be a
|
||||
Permission instance.
|
||||
|
||||
role
|
||||
The role for which the state has the permission. Needs to be a lfc
|
||||
Role instance.
|
||||
"""
|
||||
state = models.ForeignKey(State, verbose_name=_(u"State"))
|
||||
permission = models.ForeignKey(Permission, verbose_name=_(u"Permission"))
|
||||
role = models.ForeignKey(Role, verbose_name=_(u"Role"))
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s %s %s" % (self.state.name, self.role.name, self.permission.name)
|
10
workflows/templates/workflows/transitions.html
Normal file
10
workflows/templates/workflows/transitions.html
Normal file
|
@ -0,0 +1,10 @@
|
|||
{% load i18n %}
|
||||
<label>{{ state.name }}</label>
|
||||
<select name="transition">
|
||||
{% for transition in transitions %}
|
||||
<option {% if transition.selected %}selected="selected"{% endif %}
|
||||
value="{{ transition.id }}">
|
||||
{{ transition.name }}
|
||||
</option>
|
||||
{% endfor %}
|
||||
</select>
|
0
workflows/templatetags/__init__.py
Normal file
0
workflows/templatetags/__init__.py
Normal file
18
workflows/templatetags/workflows_tags.py
Normal file
18
workflows/templatetags/workflows_tags.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
# django imports
|
||||
from django import template
|
||||
|
||||
# workflows imports
|
||||
import workflows.utils
|
||||
|
||||
register = template.Library()
|
||||
|
||||
@register.inclusion_tag('workflows/transitions.html', takes_context=True)
|
||||
def transitions(context, obj):
|
||||
"""
|
||||
"""
|
||||
request = context.get("request")
|
||||
|
||||
return {
|
||||
"transitions" : workflows.utils.get_allowed_transitions(obj, request.user),
|
||||
"state" : workflows.utils.get_state(obj),
|
||||
}
|
600
workflows/tests.py
Normal file
600
workflows/tests.py
Normal file
|
@ -0,0 +1,600 @@
|
|||
# django imports
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.flatpages.models import FlatPage
|
||||
from django.test import TestCase
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.sessions.backends.file import SessionStore
|
||||
from django.core.handlers.wsgi import WSGIRequest
|
||||
from django.test.client import Client
|
||||
|
||||
# workflows import
|
||||
import permissions.utils
|
||||
import workflows.utils
|
||||
from workflows.models import State
|
||||
from workflows.models import StateInheritanceBlock
|
||||
from workflows.models import StatePermissionRelation
|
||||
from workflows.models import StateObjectRelation
|
||||
from workflows.models import Transition
|
||||
from workflows.models import Workflow
|
||||
from workflows.models import WorkflowModelRelation
|
||||
from workflows.models import WorkflowObjectRelation
|
||||
from workflows.models import WorkflowPermissionRelation
|
||||
|
||||
class WorkflowTestCase(TestCase):
|
||||
"""Tests a simple workflow without permissions.
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
create_workflow(self)
|
||||
|
||||
def test_get_states(self):
|
||||
"""
|
||||
"""
|
||||
states = self.w.states.all()
|
||||
self.assertEqual(states[0], self.private)
|
||||
self.assertEqual(states[1], self.public)
|
||||
|
||||
def test_unicode(self):
|
||||
"""
|
||||
"""
|
||||
self.assertEqual(self.w.__unicode__(), u"Standard")
|
||||
|
||||
class PermissionsTestCase(TestCase):
|
||||
"""Tests a simple workflow with permissions.
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
create_workflow(self)
|
||||
|
||||
# Register roles
|
||||
self.anonymous = permissions.utils.register_role("Anonymous")
|
||||
self.owner = permissions.utils.register_role("Owner")
|
||||
|
||||
self.user = User.objects.create(username="john")
|
||||
permissions.utils.add_role(self.user, self.owner)
|
||||
|
||||
# Example content type
|
||||
self.page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
|
||||
# Registers permissions
|
||||
self.view = permissions.utils.register_permission("View", "view")
|
||||
self.edit = permissions.utils.register_permission("Edit", "edit")
|
||||
|
||||
# Add all permissions which are managed by the workflow
|
||||
wpr = WorkflowPermissionRelation.objects.create(workflow=self.w, permission=self.view)
|
||||
wpr = WorkflowPermissionRelation.objects.create(workflow=self.w, permission=self.edit)
|
||||
|
||||
# Add permissions for single states
|
||||
spr = StatePermissionRelation.objects.create(state=self.public, permission=self.view, role=self.owner)
|
||||
spr = StatePermissionRelation.objects.create(state=self.private, permission=self.view, role=self.owner)
|
||||
spr = StatePermissionRelation.objects.create(state=self.private, permission=self.edit, role=self.owner)
|
||||
|
||||
# Add inheritance block for single states
|
||||
sib = StateInheritanceBlock.objects.create(state=self.private, permission=self.view)
|
||||
sib = StateInheritanceBlock.objects.create(state=self.private, permission=self.edit)
|
||||
sib = StateInheritanceBlock.objects.create(state=self.public, permission=self.edit)
|
||||
|
||||
workflows.utils.set_workflow(self.page_1, self.w)
|
||||
|
||||
def test_set_state(self):
|
||||
"""
|
||||
"""
|
||||
# Permissions
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "edit")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Inheritance
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "edit")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
# Change state
|
||||
workflows.utils.set_state(self.page_1, self.public)
|
||||
|
||||
# Permissions
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "edit")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = permissions.utils.has_permission(self.page_1, self.user, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
# Inheritance
|
||||
result = permissions.utils.is_inherited(self.page_1, "view")
|
||||
self.assertEqual(result, True)
|
||||
|
||||
result = permissions.utils.is_inherited(self.page_1, "edit")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_set_initial_state(self):
|
||||
"""
|
||||
"""
|
||||
state = workflows.utils.get_state(self.page_1)
|
||||
self.assertEqual(state.name, self.private.name)
|
||||
|
||||
workflows.utils.do_transition(self.page_1, self.make_public, self.user)
|
||||
state = workflows.utils.get_state(self.page_1)
|
||||
self.assertEqual(state.name, self.public.name)
|
||||
|
||||
workflows.utils.set_initial_state(self.page_1)
|
||||
state = workflows.utils.get_state(self.page_1)
|
||||
self.assertEqual(state.name, self.private.name)
|
||||
|
||||
def test_do_transition(self):
|
||||
"""
|
||||
"""
|
||||
state = workflows.utils.get_state(self.page_1)
|
||||
self.assertEqual(state.name, self.private.name)
|
||||
|
||||
# by transition
|
||||
workflows.utils.do_transition(self.page_1, self.make_public, self.user)
|
||||
|
||||
state = workflows.utils.get_state(self.page_1)
|
||||
self.assertEqual(state.name, self.public.name)
|
||||
|
||||
# by name
|
||||
workflows.utils.do_transition(self.page_1, "Make private", self.user)
|
||||
|
||||
state = workflows.utils.get_state(self.page_1)
|
||||
self.assertEqual(state.name, self.private.name)
|
||||
|
||||
# name which does not exist
|
||||
result = workflows.utils.do_transition(self.page_1, "Make pending", self.user)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
wrong = Transition.objects.create(name="Wrong", workflow=self.w, destination = self.public)
|
||||
|
||||
# name which does not exist
|
||||
result = workflows.utils.do_transition(self.page_1, wrong, self.user)
|
||||
self.assertEqual(result, False)
|
||||
|
||||
class UtilsTestCase(TestCase):
|
||||
"""Tests various methods of the utils module.
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
create_workflow(self)
|
||||
self.user = User.objects.create()
|
||||
|
||||
def test_workflow(self):
|
||||
"""
|
||||
"""
|
||||
workflows.utils.set_workflow(self.user, self.w)
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
def test_state(self):
|
||||
"""
|
||||
"""
|
||||
result = workflows.utils.get_state(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
workflows.utils.set_workflow(self.user, self.w)
|
||||
result = workflows.utils.get_state(self.user)
|
||||
self.assertEqual(result, self.w.initial_state)
|
||||
|
||||
def test_set_workflow_1(self):
|
||||
"""Set worklow by object
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(self.user)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
wp = Workflow.objects.create(name="Portal")
|
||||
|
||||
# Set for model
|
||||
workflows.utils.set_workflow_for_model(ctype, wp)
|
||||
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, wp)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, wp)
|
||||
|
||||
# Set for object
|
||||
workflows.utils.set_workflow_for_object(self.user, self.w)
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
# The model still have wp
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, wp)
|
||||
|
||||
def test_set_workflow_2(self):
|
||||
"""Set worklow by name
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(self.user)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
wp = Workflow.objects.create(name="Portal")
|
||||
|
||||
# Set for model
|
||||
workflows.utils.set_workflow_for_model(ctype, "Portal")
|
||||
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, wp)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, wp)
|
||||
|
||||
# Set for object
|
||||
workflows.utils.set_workflow_for_object(self.user, "Standard")
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
# The model still have wp
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, wp)
|
||||
|
||||
# Workflow which does not exist
|
||||
result = workflows.utils.set_workflow_for_model(ctype, "Wrong")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
result = workflows.utils.set_workflow_for_object(self.user, "Wrong")
|
||||
self.assertEqual(result, False)
|
||||
|
||||
def test_get_objects_for_workflow_1(self):
|
||||
"""Workflow is added to object.
|
||||
"""
|
||||
result = workflows.utils.get_objects_for_workflow(self.w)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
workflows.utils.set_workflow(self.user, self.w)
|
||||
result = workflows.utils.get_objects_for_workflow(self.w)
|
||||
self.assertEqual(result, [self.user])
|
||||
|
||||
def test_get_objects_for_workflow_2(self):
|
||||
"""Workflow is added to content type.
|
||||
"""
|
||||
result = workflows.utils.get_objects_for_workflow(self.w)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
ctype = ContentType.objects.get_for_model(self.user)
|
||||
workflows.utils.set_workflow(ctype, self.w)
|
||||
result = workflows.utils.get_objects_for_workflow(self.w)
|
||||
self.assertEqual(result, [self.user])
|
||||
|
||||
def test_get_objects_for_workflow_3(self):
|
||||
"""Workflow is added to content type and object.
|
||||
"""
|
||||
result = workflows.utils.get_objects_for_workflow(self.w)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
workflows.utils.set_workflow(self.user, self.w)
|
||||
result = workflows.utils.get_objects_for_workflow(self.w)
|
||||
self.assertEqual(result, [self.user])
|
||||
|
||||
ctype = ContentType.objects.get_for_model(self.user)
|
||||
workflows.utils.set_workflow(ctype, self.w)
|
||||
result = workflows.utils.get_objects_for_workflow(self.w)
|
||||
self.assertEqual(result, [self.user])
|
||||
|
||||
def test_get_objects_for_workflow_4(self):
|
||||
"""Get workflow by name
|
||||
"""
|
||||
result = workflows.utils.get_objects_for_workflow("Standard")
|
||||
self.assertEqual(result, [])
|
||||
|
||||
workflows.utils.set_workflow(self.user, self.w)
|
||||
result = workflows.utils.get_objects_for_workflow("Standard")
|
||||
self.assertEqual(result, [self.user])
|
||||
|
||||
# Workflow which does not exist
|
||||
result = workflows.utils.get_objects_for_workflow("Wrong")
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_remove_workflow_from_model(self):
|
||||
"""
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(self.user)
|
||||
|
||||
result = workflows.utils.get_workflow(ctype)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
workflows.utils.set_workflow_for_model(ctype, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
workflows.utils.remove_workflow_from_model(ctype)
|
||||
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
result = workflows.utils.get_workflow_for_object(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_remove_workflow_from_object(self):
|
||||
"""
|
||||
"""
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
workflows.utils.set_workflow_for_object(self.user, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
result = workflows.utils.remove_workflow_from_object(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_remove_workflow_1(self):
|
||||
"""Removes workflow from model
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(self.user)
|
||||
|
||||
result = workflows.utils.get_workflow(ctype)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
workflows.utils.set_workflow_for_model(ctype, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
workflows.utils.remove_workflow(ctype)
|
||||
|
||||
result = workflows.utils.get_workflow_for_model(ctype)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
result = workflows.utils.get_workflow_for_object(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_remove_workflow_2(self):
|
||||
"""Removes workflow from object
|
||||
"""
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
workflows.utils.set_workflow_for_object(self.user, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
result = workflows.utils.remove_workflow(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_get_allowed_transitions(self):
|
||||
"""Tests get_allowed_transitions method
|
||||
"""
|
||||
page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
role_1 = permissions.utils.register_role("Role 1")
|
||||
permissions.utils.add_role(self.user, role_1)
|
||||
|
||||
view = permissions.utils.register_permission("Publish", "publish")
|
||||
|
||||
transitions = self.private.get_allowed_transitions(page_1, self.user)
|
||||
self.assertEqual(len(transitions), 1)
|
||||
|
||||
# protect the transition with a permission
|
||||
self.make_public.permission = view
|
||||
self.make_public.save()
|
||||
|
||||
# user has no transition
|
||||
transitions = self.private.get_allowed_transitions(page_1, self.user)
|
||||
self.assertEqual(len(transitions), 0)
|
||||
|
||||
# grant permission
|
||||
permissions.utils.grant_permission(page_1, role_1, view)
|
||||
|
||||
# user has transition again
|
||||
transitions = self.private.get_allowed_transitions(page_1, self.user)
|
||||
self.assertEqual(len(transitions), 1)
|
||||
|
||||
def test_get_workflow_for_object(self):
|
||||
"""
|
||||
"""
|
||||
result = workflows.utils.get_workflow(self.user)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
# Set workflow for a user
|
||||
workflows.utils.set_workflow_for_object(self.user, self.w)
|
||||
|
||||
# Get workflow for the user
|
||||
result = workflows.utils.get_workflow_for_object(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
# Set workflow for a FlatPage
|
||||
page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
workflows.utils.set_workflow_for_object(page_1, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow_for_object(self.user)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
result = workflows.utils.get_workflow_for_object(page_1)
|
||||
self.assertEqual(result, self.w)
|
||||
|
||||
class StateTestCase(TestCase):
|
||||
"""Tests the State model
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
create_workflow(self)
|
||||
self.user = User.objects.create()
|
||||
self.role_1 = permissions.utils.register_role("Role 1")
|
||||
permissions.utils.add_role(self.user, self.role_1)
|
||||
self.page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
|
||||
def test_unicode(self):
|
||||
"""
|
||||
"""
|
||||
self.assertEqual(self.private.__unicode__(), u"Private (Standard)")
|
||||
|
||||
def test_transitions(self):
|
||||
"""
|
||||
"""
|
||||
transitions = self.public.transitions.all()
|
||||
self.assertEqual(len(transitions), 1)
|
||||
self.assertEqual(transitions[0], self.make_private)
|
||||
|
||||
transitions = self.private.transitions.all()
|
||||
self.assertEqual(len(transitions), 1)
|
||||
self.assertEqual(transitions[0], self.make_public)
|
||||
|
||||
def test_get_transitions(self):
|
||||
"""
|
||||
"""
|
||||
transitions = self.private.get_allowed_transitions(self.page_1, self.user)
|
||||
self.assertEqual(len(transitions), 1)
|
||||
self.assertEqual(transitions[0], self.make_public)
|
||||
|
||||
transitions = self.public.get_allowed_transitions(self.page_1, self.user)
|
||||
self.assertEqual(len(transitions), 1)
|
||||
self.assertEqual(transitions[0], self.make_private)
|
||||
|
||||
def test_get_allowed_transitions(self):
|
||||
"""
|
||||
"""
|
||||
self.view = permissions.utils.register_permission("Publish", "publish")
|
||||
transitions = self.private.get_allowed_transitions(self.page_1, self.user)
|
||||
self.assertEqual(len(transitions), 1)
|
||||
|
||||
# protect the transition with a permission
|
||||
self.make_public.permission = self.view
|
||||
self.make_public.save()
|
||||
|
||||
# user has no transition
|
||||
transitions = self.private.get_allowed_transitions(self.page_1, self.user)
|
||||
self.assertEqual(len(transitions), 0)
|
||||
|
||||
# grant permission
|
||||
permissions.utils.grant_permission(self.page_1, self.role_1, self.view)
|
||||
|
||||
# user has transition again
|
||||
transitions = self.private.get_allowed_transitions(self.page_1, self.user)
|
||||
self.assertEqual(len(transitions), 1)
|
||||
|
||||
class TransitionTestCase(TestCase):
|
||||
"""Tests the Transition model
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
create_workflow(self)
|
||||
|
||||
def test_unicode(self):
|
||||
"""
|
||||
"""
|
||||
self.assertEqual(self.make_private.__unicode__(), u"Make private")
|
||||
|
||||
class RelationsTestCase(TestCase):
|
||||
"""Tests various Relations models.
|
||||
"""
|
||||
def setUp(self):
|
||||
"""
|
||||
"""
|
||||
create_workflow(self)
|
||||
self.page_1 = FlatPage.objects.create(url="/page-1/", title="Page 1")
|
||||
|
||||
def test_unicode(self):
|
||||
"""
|
||||
"""
|
||||
# WorkflowObjectRelation
|
||||
workflows.utils.set_workflow(self.page_1, self.w)
|
||||
wor = WorkflowObjectRelation.objects.filter()[0]
|
||||
self.assertEqual(wor.__unicode__(), "flat page 1 - Standard")
|
||||
|
||||
# StateObjectRelation
|
||||
workflows.utils.set_state(self.page_1, self.public)
|
||||
sor = StateObjectRelation.objects.filter()[0]
|
||||
self.assertEqual(sor.__unicode__(), "flat page 1 - Public")
|
||||
|
||||
# WorkflowModelRelation
|
||||
ctype = ContentType.objects.get_for_model(self.page_1)
|
||||
workflows.utils.set_workflow(ctype, self.w)
|
||||
wmr = WorkflowModelRelation.objects.filter()[0]
|
||||
self.assertEqual(wmr.__unicode__(), "flat page - Standard")
|
||||
|
||||
# WorkflowPermissionRelation
|
||||
self.view = permissions.utils.register_permission("View", "view")
|
||||
wpr = WorkflowPermissionRelation.objects.create(workflow=self.w, permission=self.view)
|
||||
self.assertEqual(wpr.__unicode__(), "Standard View")
|
||||
|
||||
# StatePermissionRelation
|
||||
self.owner = permissions.utils.register_role("Owner")
|
||||
spr = StatePermissionRelation.objects.create(state=self.public, permission=self.view, role=self.owner)
|
||||
self.assertEqual(spr.__unicode__(), "Public Owner View")
|
||||
|
||||
# Helpers ####################################################################
|
||||
|
||||
def create_workflow(self):
|
||||
self.w = Workflow.objects.create(name="Standard")
|
||||
|
||||
self.private = State.objects.create(name="Private", workflow= self.w)
|
||||
self.public = State.objects.create(name="Public", workflow= self.w)
|
||||
|
||||
self.make_public = Transition.objects.create(name="Make public", workflow=self.w, destination = self.public)
|
||||
self.make_private = Transition.objects.create(name="Make private", workflow=self.w, destination = self.private)
|
||||
|
||||
self.private.transitions.add(self.make_public)
|
||||
self.public.transitions.add(self.make_private)
|
||||
|
||||
self.w.initial_state = self.private
|
||||
self.w.save()
|
||||
|
||||
# Taken from "http://www.djangosnippets.org/snippets/963/"
|
||||
class RequestFactory(Client):
|
||||
"""
|
||||
Class that lets you create mock Request objects for use in testing.
|
||||
|
||||
Usage:
|
||||
|
||||
rf = RequestFactory()
|
||||
get_request = rf.get('/hello/')
|
||||
post_request = rf.post('/submit/', {'foo': 'bar'})
|
||||
|
||||
This class re-uses the django.test.client.Client interface, docs here:
|
||||
http://www.djangoproject.com/documentation/testing/#the-test-client
|
||||
|
||||
Once you have a request object you can pass it to any view function,
|
||||
just as if that view had been hooked up using a URLconf.
|
||||
|
||||
"""
|
||||
def request(self, **request):
|
||||
"""
|
||||
Similar to parent class, but returns the request object as soon as it
|
||||
has created it.
|
||||
"""
|
||||
environ = {
|
||||
'HTTP_COOKIE': self.cookies,
|
||||
'PATH_INFO': '/',
|
||||
'QUERY_STRING': '',
|
||||
'REQUEST_METHOD': 'GET',
|
||||
'SCRIPT_NAME': '',
|
||||
'SERVER_NAME': 'testserver',
|
||||
'SERVER_PORT': 80,
|
||||
'SERVER_PROTOCOL': 'HTTP/1.1',
|
||||
}
|
||||
environ.update(self.defaults)
|
||||
environ.update(request)
|
||||
return WSGIRequest(environ)
|
||||
|
||||
def create_request():
|
||||
"""
|
||||
"""
|
||||
rf = RequestFactory()
|
||||
request = rf.get('/')
|
||||
request.session = SessionStore()
|
||||
|
||||
user = User()
|
||||
user.is_superuser = True
|
||||
user.save()
|
||||
request.user = user
|
||||
|
||||
return request
|
7
workflows/urls.py
Normal file
7
workflows/urls.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
from django.conf.urls.defaults import *
|
||||
|
||||
# URL patterns for django-workflows
|
||||
|
||||
urlpatterns = patterns('django-workflows.views',
|
||||
# Add url patterns here
|
||||
)
|
330
workflows/utils.py
Normal file
330
workflows/utils.py
Normal file
|
@ -0,0 +1,330 @@
|
|||
# django imports
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
# workflows imports
|
||||
from permissions.models import ObjectPermission
|
||||
from permissions.models import ObjectPermissionInheritanceBlock
|
||||
from workflows.models import StateInheritanceBlock
|
||||
from workflows.models import StateObjectRelation
|
||||
from workflows.models import StatePermissionRelation
|
||||
from workflows.models import Transition
|
||||
from workflows.models import Workflow
|
||||
from workflows.models import WorkflowModelRelation
|
||||
from workflows.models import WorkflowObjectRelation
|
||||
from workflows.models import WorkflowPermissionRelation
|
||||
|
||||
# permissions imports
|
||||
import permissions.utils
|
||||
|
||||
def get_objects_for_workflow(workflow):
|
||||
"""Returns all objects which have passed workflow.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
workflow
|
||||
The workflow for which the objects are returned. Can be a Workflow
|
||||
instance or a string with the workflow name.
|
||||
"""
|
||||
if not isinstance(workflow, Workflow):
|
||||
try:
|
||||
workflow = Workflow.objects.get(name=workflow)
|
||||
except Workflow.DoesNotExist:
|
||||
return []
|
||||
|
||||
return workflow.get_objects()
|
||||
|
||||
def remove_workflow(ctype_or_obj):
|
||||
"""Removes the workflow from the passed content type or object. After this
|
||||
function has been called the content type or object has no workflow
|
||||
anymore.
|
||||
|
||||
If ctype_or_obj is an object the workflow is removed from the object not
|
||||
from the belonging content type.
|
||||
|
||||
If ctype_or_obj is an content type the workflow is removed from the
|
||||
content type not from instances of the content type (if they have an own
|
||||
workflow)
|
||||
|
||||
ctype_or_obj
|
||||
The content type or the object to which the passed workflow should be
|
||||
set. Can be either a ContentType instance or any LFC Django model
|
||||
instance.
|
||||
"""
|
||||
if isinstance(ctype_or_obj, ContentType):
|
||||
remove_workflow_from_model(ctype_or_obj)
|
||||
else:
|
||||
remove_workflow_from_object(ctype_or_obj)
|
||||
|
||||
def remove_workflow_from_model(ctype):
|
||||
"""Removes the workflow from passed content type. After this function has
|
||||
been called the content type has no workflow anymore (the instances might
|
||||
have own ones).
|
||||
|
||||
ctype
|
||||
The content type from which the passed workflow should be removed.
|
||||
Must be a ContentType instance.
|
||||
"""
|
||||
# First delete all states, inheritance blocks and permissions from ctype's
|
||||
# instances which have passed workflow.
|
||||
workflow = get_workflow_for_model(ctype)
|
||||
for obj in get_objects_for_workflow(workflow):
|
||||
try:
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
sor = StateObjectRelation.objects.get(content_id=obj.id, content_type=ctype)
|
||||
except StateObjectRelation.DoesNotExist:
|
||||
pass
|
||||
else:
|
||||
sor.delete()
|
||||
|
||||
# Reset all permissions
|
||||
permissions.utils.reset(obj)
|
||||
|
||||
try:
|
||||
wmr = WorkflowModelRelation.objects.get(content_type=ctype)
|
||||
except WorkflowModelRelation.DoesNotExist:
|
||||
pass
|
||||
else:
|
||||
wmr.delete()
|
||||
|
||||
def remove_workflow_from_object(obj):
|
||||
"""Removes the workflow from the passed object. After this function has
|
||||
been called the object has no *own* workflow anymore (it might have one
|
||||
via its content type).
|
||||
|
||||
obj
|
||||
The object from which the passed workflow should be set. Must be a
|
||||
Django Model instance.
|
||||
"""
|
||||
try:
|
||||
wor = WorkflowObjectRelation.objects.get(content_type=obj)
|
||||
except WorkflowObjectRelation.DoesNotExist:
|
||||
pass
|
||||
else:
|
||||
wor.delete()
|
||||
|
||||
# Reset all permissions
|
||||
permissions.utils.reset(obj)
|
||||
|
||||
# Set initial of object's content types workflow (if there is one)
|
||||
set_initial_state(obj)
|
||||
|
||||
def set_workflow(ctype_or_obj, workflow):
|
||||
"""Sets the workflow for passed content type or object. See the specific
|
||||
methods for more information.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
workflow
|
||||
The workflow which should be set to the object or model.
|
||||
|
||||
ctype_or_obj
|
||||
The content type or the object to which the passed workflow should be
|
||||
set. Can be either a ContentType instance or any Django model
|
||||
instance.
|
||||
"""
|
||||
return workflow.set_to(ctype_or_obj)
|
||||
|
||||
def set_workflow_for_object(obj, workflow):
|
||||
"""Sets the passed workflow to the passed object.
|
||||
|
||||
If the object has already the given workflow nothing happens. Otherwise
|
||||
the object gets the passed workflow and the state is set to the workflow's
|
||||
initial state.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
workflow
|
||||
The workflow which should be set to the object. Can be a Workflow
|
||||
instance or a string with the workflow name.
|
||||
|
||||
obj
|
||||
The object which gets the passed workflow.
|
||||
"""
|
||||
if isinstance(workflow, Workflow) == False:
|
||||
try:
|
||||
workflow = Workflow.objects.get(name=workflow)
|
||||
except Workflow.DoesNotExist:
|
||||
return False
|
||||
|
||||
workflow.set_to_object(obj)
|
||||
|
||||
def set_workflow_for_model(ctype, workflow):
|
||||
"""Sets the passed workflow to the passed content type. If the content
|
||||
type has already an assigned workflow the workflow is overwritten.
|
||||
|
||||
The objects which had the old workflow must updated explicitely.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
workflow
|
||||
The workflow which should be set to passend content type. Must be a
|
||||
Workflow instance.
|
||||
|
||||
ctype
|
||||
The content type to which the passed workflow should be assigned. Can
|
||||
be any Django model instance
|
||||
"""
|
||||
if isinstance(workflow, Workflow) == False:
|
||||
try:
|
||||
workflow = Workflow.objects.get(name=workflow)
|
||||
except Workflow.DoesNotExist:
|
||||
return False
|
||||
|
||||
workflow.set_to_model(ctype)
|
||||
|
||||
def get_workflow(obj):
|
||||
"""Returns the workflow for the passed object. It takes it either from
|
||||
the passed object or - if the object doesn't have a workflow - from the
|
||||
passed object's ContentType.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
object
|
||||
The object for which the workflow should be returend. Can be any
|
||||
Django model instance.
|
||||
"""
|
||||
workflow = get_workflow_for_object(obj)
|
||||
if workflow is not None:
|
||||
return workflow
|
||||
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
return get_workflow_for_model(ctype)
|
||||
|
||||
def get_workflow_for_object(obj):
|
||||
"""Returns the workflow for the passed object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object for which the workflow should be returned. Can be any
|
||||
Django model instance.
|
||||
"""
|
||||
try:
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
wor = WorkflowObjectRelation.objects.get(content_id=obj.id, content_type=ctype)
|
||||
except WorkflowObjectRelation.DoesNotExist:
|
||||
return None
|
||||
else:
|
||||
return wor.workflow
|
||||
|
||||
def get_workflow_for_model(ctype):
|
||||
"""Returns the workflow for the passed model.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
ctype
|
||||
The content type for which the workflow should be returned. Must be
|
||||
a Django ContentType instance.
|
||||
"""
|
||||
try:
|
||||
wor = WorkflowModelRelation.objects.get(content_type=ctype)
|
||||
except WorkflowModelRelation.DoesNotExist:
|
||||
return None
|
||||
else:
|
||||
return wor.workflow
|
||||
|
||||
def get_state(obj):
|
||||
"""Returns the current workflow state for the passed object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object for which the workflow state should be returned. Can be any
|
||||
Django model instance.
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
try:
|
||||
sor = StateObjectRelation.objects.get(content_type=ctype, content_id=obj.id)
|
||||
except StateObjectRelation.DoesNotExist:
|
||||
return None
|
||||
else:
|
||||
return sor.state
|
||||
|
||||
def set_state(obj, state):
|
||||
"""Sets the state for the passed object to the passed state and updates
|
||||
the permissions for the object.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object for which the workflow state should be set. Can be any
|
||||
Django model instance.
|
||||
|
||||
state
|
||||
The state which should be set to the passed object.
|
||||
"""
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
try:
|
||||
sor = StateObjectRelation.objects.get(content_type=ctype, content_id=obj.id)
|
||||
except StateObjectRelation.DoesNotExist:
|
||||
sor = StateObjectRelation.objects.create(content=obj, state=state)
|
||||
else:
|
||||
sor.state = state
|
||||
sor.save()
|
||||
update_permissions(obj)
|
||||
|
||||
def set_initial_state(obj):
|
||||
"""Sets the initial state to the passed object.
|
||||
"""
|
||||
wf = get_workflow(obj)
|
||||
if wf is not None:
|
||||
set_state(obj, wf.get_initial_state())
|
||||
|
||||
def get_allowed_transitions(obj, user):
|
||||
"""Returns all allowed transitions for passed object and user. Takes the
|
||||
current state of the object into account.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
obj
|
||||
The object for which the transitions should be returned.
|
||||
|
||||
user
|
||||
The user for which the transitions are allowed.
|
||||
"""
|
||||
state = get_state(obj)
|
||||
if state is None:
|
||||
return []
|
||||
|
||||
return state.get_allowed_transitions(obj, user)
|
||||
|
||||
def do_transition(obj, transition, user):
|
||||
"""Processes the passed transition to the passed object (if allowed).
|
||||
"""
|
||||
if not isinstance(transition, Transition):
|
||||
try:
|
||||
transition = Transition.objects.get(name=transition)
|
||||
except Transition.DoesNotExist:
|
||||
return False
|
||||
|
||||
transitions = get_allowed_transitions(obj, user)
|
||||
if transition in transitions:
|
||||
set_state(obj, transition.destination)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def update_permissions(obj):
|
||||
"""Updates the permissions of the passed object according to the object's
|
||||
current workflow state.
|
||||
"""
|
||||
workflow = get_workflow(obj)
|
||||
state = get_state(obj)
|
||||
|
||||
# Remove all permissions for the workflow
|
||||
ct = ContentType.objects.get_for_model(obj)
|
||||
ps = [wpr.permission for wpr in WorkflowPermissionRelation.objects.filter(workflow=workflow)]
|
||||
|
||||
ObjectPermission.objects.filter(content_type = ct, content_id=obj.id, permission__in=ps).delete()
|
||||
|
||||
# Grant permission for the state
|
||||
for spr in StatePermissionRelation.objects.filter(state=state):
|
||||
permissions.utils.grant_permission(obj, spr.role, spr.permission)
|
||||
|
||||
# Remove all inheritance blocks from the object
|
||||
ObjectPermissionInheritanceBlock.objects.filter(
|
||||
content_type = ct, content_id=obj.id, permission__in=ps).delete()
|
||||
|
||||
# Add inheritance blocks of this state to the object
|
||||
for sib in StateInheritanceBlock.objects.filter(state=state):
|
||||
permissions.utils.add_inheritance_block(obj, sib.permission)
|
0
workflows/views.py
Normal file
0
workflows/views.py
Normal file
Loading…
Reference in a new issue