1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """Common DNSSEC-related functions and constants."""
17
18 import cStringIO
19 import struct
20 import time
21
22 import dns.exception
23 import dns.hash
24 import dns.name
25 import dns.node
26 import dns.rdataset
27 import dns.rdata
28 import dns.rdatatype
29 import dns.rdataclass
30
32 """The DNSSEC algorithm is not supported."""
33
35 """The DNSSEC signature is invalid."""
36
37 RSAMD5 = 1
38 DH = 2
39 DSA = 3
40 ECC = 4
41 RSASHA1 = 5
42 DSANSEC3SHA1 = 6
43 RSASHA1NSEC3SHA1 = 7
44 RSASHA256 = 8
45 RSASHA512 = 10
46 ECDSAP256SHA256 = 13
47 ECDSAP384SHA384 = 14
48 INDIRECT = 252
49 PRIVATEDNS = 253
50 PRIVATEOID = 254
51
52 _algorithm_by_text = {
53 'RSAMD5' : RSAMD5,
54 'DH' : DH,
55 'DSA' : DSA,
56 'ECC' : ECC,
57 'RSASHA1' : RSASHA1,
58 'DSANSEC3SHA1' : DSANSEC3SHA1,
59 'RSASHA1NSEC3SHA1' : RSASHA1NSEC3SHA1,
60 'RSASHA256' : RSASHA256,
61 'RSASHA512' : RSASHA512,
62 'INDIRECT' : INDIRECT,
63 'ECDSAP256SHA256' : ECDSAP256SHA256,
64 'ECDSAP384SHA384' : ECDSAP384SHA384,
65 'PRIVATEDNS' : PRIVATEDNS,
66 'PRIVATEOID' : PRIVATEOID,
67 }
68
69
70
71
72
73 _algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
74
76 """Convert text into a DNSSEC algorithm value
77 @rtype: int"""
78
79 value = _algorithm_by_text.get(text.upper())
80 if value is None:
81 value = int(text)
82 return value
83
85 """Convert a DNSSEC algorithm value to text
86 @rtype: string"""
87
88 text = _algorithm_by_value.get(value)
89 if text is None:
90 text = str(value)
91 return text
92
97
110
111 -def make_ds(name, key, algorithm, origin=None):
112 if algorithm.upper() == 'SHA1':
113 dsalg = 1
114 hash = dns.hash.get('SHA1')()
115 elif algorithm.upper() == 'SHA256':
116 dsalg = 2
117 hash = dns.hash.get('SHA256')()
118 else:
119 raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm
120
121 if isinstance(name, (str, unicode)):
122 name = dns.name.from_text(name, origin)
123 hash.update(name.canonicalize().to_wire())
124 hash.update(_to_rdata(key, origin))
125 digest = hash.digest()
126
127 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
128 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
129 len(dsrdata))
130
149
154
157
160
163
167
170
173
176
189
191 if _is_md5(algorithm):
192 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
193 elif _is_sha1(algorithm):
194 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
195 elif _is_sha256(algorithm):
196 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
197 elif _is_sha512(algorithm):
198 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
199 else:
200 raise ValidationFailure, 'unknown algorithm %u' % algorithm
201 olen = len(oid)
202 dlen = _make_hash(algorithm).digest_size
203 idbytes = [0x30] + [8 + olen + dlen] + \
204 [0x30, olen + 4] + [0x06, olen] + oid + \
205 [0x05, 0x00] + [0x04, dlen]
206 return ''.join(map(chr, idbytes))
207
209 """Validate an RRset against a single signature rdata
210
211 The owner name of the rrsig is assumed to be the same as the owner name
212 of the rrset.
213
214 @param rrset: The RRset to validate
215 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
216 tuple
217 @param rrsig: The signature rdata
218 @type rrsig: dns.rrset.Rdata
219 @param keys: The key dictionary.
220 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
221 @param origin: The origin to use for relative names
222 @type origin: dns.name.Name or None
223 @param now: The time to use when validating the signatures. The default
224 is the current time.
225 @type now: int
226 """
227
228 if isinstance(origin, (str, unicode)):
229 origin = dns.name.from_text(origin, dns.name.root)
230
231 for candidate_key in _find_candidate_keys(keys, rrsig):
232 if not candidate_key:
233 raise ValidationFailure, 'unknown key'
234
235
236
237 if isinstance(rrset, tuple):
238 rrname = rrset[0]
239 rdataset = rrset[1]
240 else:
241 rrname = rrset.name
242 rdataset = rrset
243
244 if now is None:
245 now = time.time()
246 if rrsig.expiration < now:
247 raise ValidationFailure, 'expired'
248 if rrsig.inception > now:
249 raise ValidationFailure, 'not yet valid'
250
251 hash = _make_hash(rrsig.algorithm)
252
253 if _is_rsa(rrsig.algorithm):
254 keyptr = candidate_key.key
255 (bytes,) = struct.unpack('!B', keyptr[0:1])
256 keyptr = keyptr[1:]
257 if bytes == 0:
258 (bytes,) = struct.unpack('!H', keyptr[0:2])
259 keyptr = keyptr[2:]
260 rsa_e = keyptr[0:bytes]
261 rsa_n = keyptr[bytes:]
262 keylen = len(rsa_n) * 8
263 pubkey = Crypto.PublicKey.RSA.construct(
264 (Crypto.Util.number.bytes_to_long(rsa_n),
265 Crypto.Util.number.bytes_to_long(rsa_e)))
266 sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
267 elif _is_dsa(rrsig.algorithm):
268 keyptr = candidate_key.key
269 (t,) = struct.unpack('!B', keyptr[0:1])
270 keyptr = keyptr[1:]
271 octets = 64 + t * 8
272 dsa_q = keyptr[0:20]
273 keyptr = keyptr[20:]
274 dsa_p = keyptr[0:octets]
275 keyptr = keyptr[octets:]
276 dsa_g = keyptr[0:octets]
277 keyptr = keyptr[octets:]
278 dsa_y = keyptr[0:octets]
279 pubkey = Crypto.PublicKey.DSA.construct(
280 (Crypto.Util.number.bytes_to_long(dsa_y),
281 Crypto.Util.number.bytes_to_long(dsa_g),
282 Crypto.Util.number.bytes_to_long(dsa_p),
283 Crypto.Util.number.bytes_to_long(dsa_q)))
284 (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
285 sig = (Crypto.Util.number.bytes_to_long(dsa_r),
286 Crypto.Util.number.bytes_to_long(dsa_s))
287 elif _is_ecdsa(rrsig.algorithm):
288 if rrsig.algorithm == ECDSAP256SHA256:
289 curve = ecdsa.curves.NIST256p
290 key_len = 32
291 digest_len = 32
292 elif rrsig.algorithm == ECDSAP384SHA384:
293 curve = ecdsa.curves.NIST384p
294 key_len = 48
295 digest_len = 48
296 else:
297
298 raise ValidationFailure, 'unknown ECDSA curve'
299 keyptr = candidate_key.key
300 x = Crypto.Util.number.bytes_to_long(keyptr[0:key_len])
301 y = Crypto.Util.number.bytes_to_long(keyptr[key_len:key_len * 2])
302 assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y)
303 point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
304 verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point,
305 curve)
306 pubkey = ECKeyWrapper(verifying_key, key_len)
307 r = rrsig.signature[:key_len]
308 s = rrsig.signature[key_len:]
309 sig = ecdsa.ecdsa.Signature(Crypto.Util.number.bytes_to_long(r),
310 Crypto.Util.number.bytes_to_long(s))
311 else:
312 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
313
314 hash.update(_to_rdata(rrsig, origin)[:18])
315 hash.update(rrsig.signer.to_digestable(origin))
316
317 if rrsig.labels < len(rrname) - 1:
318 suffix = rrname.split(rrsig.labels + 1)[1]
319 rrname = dns.name.from_text('*', suffix)
320 rrnamebuf = rrname.to_digestable(origin)
321 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
322 rrsig.original_ttl)
323 rrlist = sorted(rdataset);
324 for rr in rrlist:
325 hash.update(rrnamebuf)
326 hash.update(rrfixed)
327 rrdata = rr.to_digestable(origin)
328 rrlen = struct.pack('!H', len(rrdata))
329 hash.update(rrlen)
330 hash.update(rrdata)
331
332 digest = hash.digest()
333
334 if _is_rsa(rrsig.algorithm):
335
336 digest = _make_algorithm_id(rrsig.algorithm) + digest
337 padlen = keylen // 8 - len(digest) - 3
338 digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
339 elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm):
340 pass
341 else:
342
343
344
345 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
346
347 if pubkey.verify(digest, sig):
348 return
349 raise ValidationFailure, 'verify failure'
350
351 -def _validate(rrset, rrsigset, keys, origin=None, now=None):
352 """Validate an RRset
353
354 @param rrset: The RRset to validate
355 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
356 tuple
357 @param rrsigset: The signature RRset
358 @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
359 tuple
360 @param keys: The key dictionary.
361 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
362 @param origin: The origin to use for relative names
363 @type origin: dns.name.Name or None
364 @param now: The time to use when validating the signatures. The default
365 is the current time.
366 @type now: int
367 """
368
369 if isinstance(origin, (str, unicode)):
370 origin = dns.name.from_text(origin, dns.name.root)
371
372 if isinstance(rrset, tuple):
373 rrname = rrset[0]
374 else:
375 rrname = rrset.name
376
377 if isinstance(rrsigset, tuple):
378 rrsigname = rrsigset[0]
379 rrsigrdataset = rrsigset[1]
380 else:
381 rrsigname = rrsigset.name
382 rrsigrdataset = rrsigset
383
384 rrname = rrname.choose_relativity(origin)
385 rrsigname = rrname.choose_relativity(origin)
386 if rrname != rrsigname:
387 raise ValidationFailure, "owner names do not match"
388
389 for rrsig in rrsigrdataset:
390 try:
391 _validate_rrsig(rrset, rrsig, keys, origin, now)
392 return
393 except ValidationFailure, e:
394 pass
395 raise ValidationFailure, "no RRSIGs validated"
396
398 raise NotImplementedError, "DNSSEC validation requires pycrypto"
399
400 try:
401 import Crypto.PublicKey.RSA
402 import Crypto.PublicKey.DSA
403 import Crypto.Util.number
404 validate = _validate
405 validate_rrsig = _validate_rrsig
406 _have_pycrypto = True
407 except ImportError:
408 validate = _need_pycrypto
409 validate_rrsig = _need_pycrypto
410 _have_pycrypto = False
411
412 try:
413 import ecdsa
414 import ecdsa.ecdsa
415 import ecdsa.ellipticcurve
416 import ecdsa.keys
417 _have_ecdsa = True
418
421 self.key = key
422 self.key_len = key_len
423 - def verify(self, digest, sig):
424 diglong = Crypto.Util.number.bytes_to_long(digest)
425 return self.key.pubkey.verifies(diglong, sig)
426
427 except ImportError:
428 _have_ecdsa = False
429