Source code for ldaptor.delta

"""
Changes to the content of one single LDAP entry.

(This means these do not belong here: adding or deleting of entries,
changing of location in tree)
"""

from ldaptor import attributeset
from ldaptor.protocols import pureldap, pureber
from ldaptor.protocols.ldap import ldif, distinguishedname


[docs]class Modification(attributeset.LDAPAttributeSet):
[docs] def patch(self, entry): raise NotImplementedError( '%s.patch not implemented' % self.__class__.__name__)
_LDAP_OP = None
[docs] def asLDAP(self): if self._LDAP_OP is None: raise NotImplementedError("%s.asLDAP not implemented" % self.__class__.__name__) tmplist = list(self) newlist = [] for x in range(len(tmplist)): if (isinstance(tmplist[x], str)): value = tmplist[x].encode('utf-8') newlist.append(value) else: value = tmplist[x] newlist.append(value) return pureber.BERSequence([ pureber.BEREnumerated(self._LDAP_OP), pureber.BERSequence([ pureldap.LDAPAttributeDescription(self.key), pureber.BERSet(map(pureldap.LDAPString, newlist)), ]), ]).toWire()
def __eq__(self, other): if not isinstance(other, self.__class__): return False return super().__eq__(other)
[docs]class Add(Modification): _LDAP_OP = 0
[docs] def patch(self, entry): if self.key in entry: entry[self.key].update(self) else: entry[self.key] = self
[docs] def asLDIF(self): r=[] values = list(self) values.sort() r.append(ldif.attributeAsLDIF('add', self.key)) for v in values: r.append(ldif.attributeAsLDIF(self.key, v)) r.append(b'-\n') return b''.join(r)
[docs]class Delete(Modification): _LDAP_OP = 1
[docs] def patch(self, entry): if not self: del entry[self.key] else: for v in self: entry[self.key].remove(v)
[docs] def asLDIF(self): r=[] values = list(self) values.sort() r.append(ldif.attributeAsLDIF('delete', self.key)) for v in values: r.append(ldif.attributeAsLDIF(self.key, v)) r.append(b'-\n') return b''.join(r)
[docs]class Replace(Modification): _LDAP_OP = 2
[docs] def patch(self, entry): if self: entry[self.key] = self else: try: del entry[self.key] except KeyError: pass
[docs] def asLDIF(self): r=[] values = list(self) values.sort() r.append(ldif.attributeAsLDIF('replace', self.key)) for v in values: r.append(ldif.attributeAsLDIF(self.key, v)) r.append(b'-\n') return b''.join(r)
[docs]class Operation:
[docs] def patch(self, root): """ Find the correct entry in IConnectedLDAPEntry and patch it. @param root: IConnectedLDAPEntry that is at the root of the subtree the patch applies to. @returns: Deferred with None or failure. """ raise NotImplementedError( '%s.patch not implemented' % self.__class__.__name__)
[docs]class ModifyOp(Operation): def __init__(self, dn, modifications=[]): if not isinstance(dn, distinguishedname.DistinguishedName): dn=distinguishedname.DistinguishedName(stringValue=dn) self.dn = dn self.modifications = modifications[:]
[docs] def asLDIF(self): r = [] r.append(ldif.attributeAsLDIF('dn', self.dn.getText())) r.append(ldif.attributeAsLDIF('changetype', 'modify')) for m in self.modifications: r.append(m.asLDIF()) r.append(b"\n") return b''.join(r)
[docs] def asLDAP(self): return pureldap.LDAPModifyRequest( object=self.dn.getText(), modification=[x.asLDAP() for x in self.modifications])
@classmethod def _getClassFromOp(class_, op): for mod in [Add, Delete, Replace]: if op == mod._LDAP_OP: return mod return None
[docs] @classmethod def fromLDAP(class_, request): if not isinstance(request, pureldap.LDAPModifyRequest): raise RuntimeError("%s.fromLDAP needs an LDAPModifyRequest" % class_.__name__) dn = request.object result = [] for op, mods in request.modification: op = op.value klass = class_._getClassFromOp(op) if klass is None: raise RuntimeError("Unknown LDAP op number %r in %s.fromLDAP" % (op, class_.__name__)) key, vals = mods key = key.value vals = [x.value for x in vals] m = klass(key, vals) result.append(m) return class_(dn, result)
[docs] def patch(self, root): d = root.lookup(self.dn) def gotEntry(entry, modifications): for mod in self.modifications: mod.patch(entry) return entry d.addCallback(gotEntry, self.modifications) return d
def __repr__(self): dn = self.dn.getText() return (self.__class__.__name__ + '(' + 'dn=%r' % dn + ', ' + 'modifications=%r' % self.modifications + ')') def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented if self.dn != other.dn: return 0 if self.modifications != other.modifications: return 0 return 1 def __hash__(self): # We use the LDIF representation as similar objects # should have the same LDIF. return hash(self.asLDIF()) def __ne__(self, other): return not self==other
[docs]class AddOp(Operation): def __init__(self, entry): self.entry = entry
[docs] def asLDIF(self): l = self.entry.toWire().splitlines() assert l[0].startswith(b'dn:') l[1:1] = [ldif.attributeAsLDIF('changetype', 'add').rstrip(b'\n')] return b''.join([x + b'\n' for x in l])
[docs] def patch(self, root): d = root.lookup(self.entry.dn.up()) def gotParent(parent, entry): parent.addChild(entry.dn.split()[0], entry) d.addCallback(gotParent, self.entry) return d
def __repr__(self): return (self.__class__.__name__ + '(' + '%r' % self.entry + ')') def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented if self.entry != other.entry: return False return True def __ne__(self, other): return not self == other def __hash__(self): # Use the LDIF representions as equal operations should # have the same LDIF. return hash(self.asLDIF())
[docs]class DeleteOp(Operation): def __init__(self, dn): """ Instance can be initialized with different objects: * ldaptor.entry.BaseLDAPEntry instance * ldaptor.protocols.ldap.distinguishedname.DistinguishedName instance * unicode or byte string """ if hasattr(dn, 'dn') and isinstance(dn.dn, distinguishedname.DistinguishedName): self.dn = dn.dn elif isinstance(dn, distinguishedname.DistinguishedName): self.dn = dn elif isinstance(dn, (bytes, str)): self.dn = distinguishedname.DistinguishedName(stringValue=dn) else: raise AssertionError('Invalid type of object: %s' % dn.__class__.__name__)
[docs] def asLDIF(self): r = [] r.append(ldif.attributeAsLDIF('dn', self.dn.getText())) r.append(ldif.attributeAsLDIF('changetype', 'delete')) r.append(b"\n") return b''.join(r)
[docs] def patch(self, root): d = root.lookup(self.dn) def gotEntry(entry): return entry.delete() d.addCallback(gotEntry) return d
def __repr__(self): dn = self.dn.getText() return (self.__class__.__name__ + '(' + '%r' % dn + ')') def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented if self.dn != other.dn: return False return True def __ne__(self, other): return not self==other def __hash__(self): return hash(self.dn)