"""
Changes to the content of one single LDAP entry.
(This means these do not belong here: adding or deleting of entries,
changing of location in tree)
"""
from ldaptor import attributeset
from ldaptor.protocols import pureldap, pureber
from ldaptor.protocols.ldap import ldif, distinguishedname
[docs]class Modification(attributeset.LDAPAttributeSet):
[docs] def patch(self, entry):
raise NotImplementedError("%s.patch not implemented" % self.__class__.__name__)
_LDAP_OP = None
[docs] def asLDAP(self):
if self._LDAP_OP is None:
raise NotImplementedError(
"%s.asLDAP not implemented" % self.__class__.__name__
)
tmplist = list(self)
newlist = []
for x in range(len(tmplist)):
if isinstance(tmplist[x], str):
value = tmplist[x].encode("utf-8")
newlist.append(value)
else:
value = tmplist[x]
newlist.append(value)
return pureber.BERSequence(
[
pureber.BEREnumerated(self._LDAP_OP),
pureber.BERSequence(
[
pureldap.LDAPAttributeDescription(self.key),
pureber.BERSet(map(pureldap.LDAPString, newlist)),
]
),
]
).toWire()
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return super().__eq__(other)
[docs]class Add(Modification):
_LDAP_OP = 0
[docs] def patch(self, entry):
if self.key in entry:
entry[self.key].update(self)
else:
entry[self.key] = self
[docs] def asLDIF(self):
r = []
values = list(self)
values.sort()
r.append(ldif.attributeAsLDIF("add", self.key))
for v in values:
r.append(ldif.attributeAsLDIF(self.key, v))
r.append(b"-\n")
return b"".join(r)
[docs]class Delete(Modification):
_LDAP_OP = 1
[docs] def patch(self, entry):
if not self:
del entry[self.key]
else:
for v in self:
entry[self.key].remove(v)
[docs] def asLDIF(self):
r = []
values = list(self)
values.sort()
r.append(ldif.attributeAsLDIF("delete", self.key))
for v in values:
r.append(ldif.attributeAsLDIF(self.key, v))
r.append(b"-\n")
return b"".join(r)
[docs]class Replace(Modification):
_LDAP_OP = 2
[docs] def patch(self, entry):
if self:
entry[self.key] = self
else:
try:
del entry[self.key]
except KeyError:
pass
[docs] def asLDIF(self):
r = []
values = list(self)
values.sort()
r.append(ldif.attributeAsLDIF("replace", self.key))
for v in values:
r.append(ldif.attributeAsLDIF(self.key, v))
r.append(b"-\n")
return b"".join(r)
[docs]class Operation:
[docs] def patch(self, root):
"""
Find the correct entry in IConnectedLDAPEntry and patch it.
@param root: IConnectedLDAPEntry that is at the root of the
subtree the patch applies to.
@returns: Deferred with None or failure.
"""
raise NotImplementedError("%s.patch not implemented" % self.__class__.__name__)
[docs]class ModifyOp(Operation):
def __init__(self, dn, modifications=[]):
if not isinstance(dn, distinguishedname.DistinguishedName):
dn = distinguishedname.DistinguishedName(stringValue=dn)
self.dn = dn
self.modifications = modifications[:]
[docs] def asLDIF(self):
r = []
r.append(ldif.attributeAsLDIF("dn", self.dn.getText()))
r.append(ldif.attributeAsLDIF("changetype", "modify"))
for m in self.modifications:
r.append(m.asLDIF())
r.append(b"\n")
return b"".join(r)
[docs] def asLDAP(self):
return pureldap.LDAPModifyRequest(
object=self.dn.getText(),
modification=[x.asLDAP() for x in self.modifications],
)
@classmethod
def _getClassFromOp(class_, op):
for mod in [Add, Delete, Replace]:
if op == mod._LDAP_OP:
return mod
return None
[docs] @classmethod
def fromLDAP(class_, request):
if not isinstance(request, pureldap.LDAPModifyRequest):
raise RuntimeError(
"%s.fromLDAP needs an LDAPModifyRequest" % class_.__name__
)
dn = request.object
result = []
for op, mods in request.modification:
op = op.value
klass = class_._getClassFromOp(op)
if klass is None:
raise RuntimeError(
"Unknown LDAP op number {!r} in {}.fromLDAP".format(
op, class_.__name__
)
)
key, vals = mods
key = key.value
vals = [x.value for x in vals]
m = klass(key, vals)
result.append(m)
return class_(dn, result)
[docs] def patch(self, root):
d = root.lookup(self.dn)
def gotEntry(entry, modifications):
for mod in self.modifications:
mod.patch(entry)
return entry
d.addCallback(gotEntry, self.modifications)
return d
def __repr__(self):
dn = self.dn.getText()
return (
self.__class__.__name__
+ "("
+ "dn=%r" % dn
+ ", "
+ "modifications=%r" % self.modifications
+ ")"
)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
if self.dn != other.dn:
return 0
if self.modifications != other.modifications:
return 0
return 1
def __hash__(self):
# We use the LDIF representation as similar objects
# should have the same LDIF.
return hash(self.asLDIF())
def __ne__(self, other):
return not self == other
[docs]class AddOp(Operation):
def __init__(self, entry):
self.entry = entry
[docs] def asLDIF(self):
l = self.entry.toWire().splitlines()
assert l[0].startswith(b"dn:")
l[1:1] = [ldif.attributeAsLDIF("changetype", "add").rstrip(b"\n")]
return b"".join([x + b"\n" for x in l])
[docs] def patch(self, root):
d = root.lookup(self.entry.dn.up())
def gotParent(parent, entry):
parent.addChild(entry.dn.split()[0], entry)
d.addCallback(gotParent, self.entry)
return d
def __repr__(self):
return self.__class__.__name__ + "(" + "%r" % self.entry + ")"
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
if self.entry != other.entry:
return False
return True
def __ne__(self, other):
return not self == other
def __hash__(self):
# Use the LDIF representions as equal operations should
# have the same LDIF.
return hash(self.asLDIF())
[docs]class DeleteOp(Operation):
def __init__(self, dn):
"""
Instance can be initialized with different objects:
* ldaptor.entry.BaseLDAPEntry instance
* ldaptor.protocols.ldap.distinguishedname.DistinguishedName instance
* unicode or byte string
"""
if hasattr(dn, "dn") and isinstance(dn.dn, distinguishedname.DistinguishedName):
self.dn = dn.dn
elif isinstance(dn, distinguishedname.DistinguishedName):
self.dn = dn
elif isinstance(dn, (bytes, str)):
self.dn = distinguishedname.DistinguishedName(stringValue=dn)
else:
raise AssertionError("Invalid type of object: %s" % dn.__class__.__name__)
[docs] def asLDIF(self):
r = []
r.append(ldif.attributeAsLDIF("dn", self.dn.getText()))
r.append(ldif.attributeAsLDIF("changetype", "delete"))
r.append(b"\n")
return b"".join(r)
[docs] def patch(self, root):
d = root.lookup(self.dn)
def gotEntry(entry):
return entry.delete()
d.addCallback(gotEntry)
return d
def __repr__(self):
dn = self.dn.getText()
return self.__class__.__name__ + "(" + "%r" % dn + ")"
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
if self.dn != other.dn:
return False
return True
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash(self.dn)