Source code for ldaptor.delta

"""
Changes to the content of one single LDAP entry.

(This means these do not belong here: adding or deleting of entries,
changing of location in tree)
"""

from ldaptor import attributeset
from ldaptor.protocols import pureldap, pureber
from ldaptor.protocols.ldap import ldif, distinguishedname


[docs]class Modification(attributeset.LDAPAttributeSet):
[docs] def patch(self, entry): raise NotImplementedError("%s.patch not implemented" % self.__class__.__name__)
_LDAP_OP = None
[docs] def asLDAP(self): if self._LDAP_OP is None: raise NotImplementedError( "%s.asLDAP not implemented" % self.__class__.__name__ ) tmplist = list(self) newlist = [] for x in range(len(tmplist)): if isinstance(tmplist[x], str): value = tmplist[x].encode("utf-8") newlist.append(value) else: value = tmplist[x] newlist.append(value) return pureber.BERSequence( [ pureber.BEREnumerated(self._LDAP_OP), pureber.BERSequence( [ pureldap.LDAPAttributeDescription(self.key), pureber.BERSet(map(pureldap.LDAPString, newlist)), ] ), ] ).toWire()
def __eq__(self, other): if not isinstance(other, self.__class__): return False return super().__eq__(other)
[docs]class Add(Modification): _LDAP_OP = 0
[docs] def patch(self, entry): if self.key in entry: entry[self.key].update(self) else: entry[self.key] = self
[docs] def asLDIF(self): r = [] values = list(self) values.sort() r.append(ldif.attributeAsLDIF("add", self.key)) for v in values: r.append(ldif.attributeAsLDIF(self.key, v)) r.append(b"-\n") return b"".join(r)
[docs]class Delete(Modification): _LDAP_OP = 1
[docs] def patch(self, entry): if not self: del entry[self.key] else: for v in self: entry[self.key].remove(v)
[docs] def asLDIF(self): r = [] values = list(self) values.sort() r.append(ldif.attributeAsLDIF("delete", self.key)) for v in values: r.append(ldif.attributeAsLDIF(self.key, v)) r.append(b"-\n") return b"".join(r)
[docs]class Replace(Modification): _LDAP_OP = 2
[docs] def patch(self, entry): if self: entry[self.key] = self else: try: del entry[self.key] except KeyError: pass
[docs] def asLDIF(self): r = [] values = list(self) values.sort() r.append(ldif.attributeAsLDIF("replace", self.key)) for v in values: r.append(ldif.attributeAsLDIF(self.key, v)) r.append(b"-\n") return b"".join(r)
[docs]class Operation:
[docs] def patch(self, root): """ Find the correct entry in IConnectedLDAPEntry and patch it. @param root: IConnectedLDAPEntry that is at the root of the subtree the patch applies to. @returns: Deferred with None or failure. """ raise NotImplementedError("%s.patch not implemented" % self.__class__.__name__)
[docs]class ModifyOp(Operation): def __init__(self, dn, modifications=[]): if not isinstance(dn, distinguishedname.DistinguishedName): dn = distinguishedname.DistinguishedName(stringValue=dn) self.dn = dn self.modifications = modifications[:]
[docs] def asLDIF(self): r = [] r.append(ldif.attributeAsLDIF("dn", self.dn.getText())) r.append(ldif.attributeAsLDIF("changetype", "modify")) for m in self.modifications: r.append(m.asLDIF()) r.append(b"\n") return b"".join(r)
[docs] def asLDAP(self): return pureldap.LDAPModifyRequest( object=self.dn.getText(), modification=[x.asLDAP() for x in self.modifications], )
@classmethod def _getClassFromOp(class_, op): for mod in [Add, Delete, Replace]: if op == mod._LDAP_OP: return mod return None
[docs] @classmethod def fromLDAP(class_, request): if not isinstance(request, pureldap.LDAPModifyRequest): raise RuntimeError( "%s.fromLDAP needs an LDAPModifyRequest" % class_.__name__ ) dn = request.object result = [] for op, mods in request.modification: op = op.value klass = class_._getClassFromOp(op) if klass is None: raise RuntimeError( "Unknown LDAP op number {!r} in {}.fromLDAP".format( op, class_.__name__ ) ) key, vals = mods key = key.value vals = [x.value for x in vals] m = klass(key, vals) result.append(m) return class_(dn, result)
[docs] def patch(self, root): d = root.lookup(self.dn) def gotEntry(entry, modifications): for mod in self.modifications: mod.patch(entry) return entry d.addCallback(gotEntry, self.modifications) return d
def __repr__(self): dn = self.dn.getText() return ( self.__class__.__name__ + "(" + "dn=%r" % dn + ", " + "modifications=%r" % self.modifications + ")" ) def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented if self.dn != other.dn: return 0 if self.modifications != other.modifications: return 0 return 1 def __hash__(self): # We use the LDIF representation as similar objects # should have the same LDIF. return hash(self.asLDIF()) def __ne__(self, other): return not self == other
[docs]class AddOp(Operation): def __init__(self, entry): self.entry = entry
[docs] def asLDIF(self): l = self.entry.toWire().splitlines() assert l[0].startswith(b"dn:") l[1:1] = [ldif.attributeAsLDIF("changetype", "add").rstrip(b"\n")] return b"".join([x + b"\n" for x in l])
[docs] def patch(self, root): d = root.lookup(self.entry.dn.up()) def gotParent(parent, entry): parent.addChild(entry.dn.split()[0], entry) d.addCallback(gotParent, self.entry) return d
def __repr__(self): return self.__class__.__name__ + "(" + "%r" % self.entry + ")" def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented if self.entry != other.entry: return False return True def __ne__(self, other): return not self == other def __hash__(self): # Use the LDIF representions as equal operations should # have the same LDIF. return hash(self.asLDIF())
[docs]class DeleteOp(Operation): def __init__(self, dn): """ Instance can be initialized with different objects: * ldaptor.entry.BaseLDAPEntry instance * ldaptor.protocols.ldap.distinguishedname.DistinguishedName instance * unicode or byte string """ if hasattr(dn, "dn") and isinstance(dn.dn, distinguishedname.DistinguishedName): self.dn = dn.dn elif isinstance(dn, distinguishedname.DistinguishedName): self.dn = dn elif isinstance(dn, (bytes, str)): self.dn = distinguishedname.DistinguishedName(stringValue=dn) else: raise AssertionError("Invalid type of object: %s" % dn.__class__.__name__)
[docs] def asLDIF(self): r = [] r.append(ldif.attributeAsLDIF("dn", self.dn.getText())) r.append(ldif.attributeAsLDIF("changetype", "delete")) r.append(b"\n") return b"".join(r)
[docs] def patch(self, root): d = root.lookup(self.dn) def gotEntry(entry): return entry.delete() d.addCallback(gotEntry) return d
def __repr__(self): dn = self.dn.getText() return self.__class__.__name__ + "(" + "%r" % dn + ")" def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented if self.dn != other.dn: return False return True def __ne__(self, other): return not self == other def __hash__(self): return hash(self.dn)