mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
The x509.subject_base() function is only used in tests. During the recent certificate refactoring, we had to get rid of the ipalib.x509 import from the module scope so that there were no circular dependecies and add it exactly to this funcion which is not used in the production code. Reviewed-By: Tibor Dudlak <tdudlak@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com>
681 lines
20 KiB
Python
681 lines
20 KiB
Python
# Authors:
|
|
# Rob Crittenden <rcritten@redhat.com>
|
|
#
|
|
# Copyright (C) 2010 Red Hat
|
|
# see file 'COPYING' for use and warranty information
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
# Certificates should be stored internally DER-encoded. We can be passed
|
|
# a certificate several ways: read if from LDAP, read it from a 3rd party
|
|
# app (dogtag, candlepin, etc) or as user input.
|
|
|
|
# Conventions
|
|
#
|
|
# Where possible the following naming conventions are used:
|
|
#
|
|
# cert: the certificate is a PEM-encoded certificate
|
|
# dercert: the certificate is DER-encoded
|
|
# rawcert: the cert is in an unknown format
|
|
|
|
from __future__ import print_function
|
|
|
|
import binascii
|
|
import datetime
|
|
import ipaddress
|
|
import ssl
|
|
import base64
|
|
import re
|
|
|
|
from cryptography import x509 as crypto_x509
|
|
from cryptography import utils as crypto_utils
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.serialization import (
|
|
Encoding, PublicFormat
|
|
)
|
|
from pyasn1.type import univ, char, namedtype, tag
|
|
from pyasn1.codec.der import decoder, encoder
|
|
# from pyasn1.codec.native import decoder, encoder
|
|
from pyasn1_modules import rfc2315, rfc2459
|
|
import six
|
|
|
|
from ipalib import errors
|
|
from ipapython.dnsutil import DNSName
|
|
|
|
if six.PY3:
|
|
unicode = str
|
|
|
|
PEM = 0
|
|
DER = 1
|
|
|
|
PEM_REGEX = re.compile(
|
|
b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
|
|
re.DOTALL)
|
|
|
|
EKU_SERVER_AUTH = '1.3.6.1.5.5.7.3.1'
|
|
EKU_CLIENT_AUTH = '1.3.6.1.5.5.7.3.2'
|
|
EKU_CODE_SIGNING = '1.3.6.1.5.5.7.3.3'
|
|
EKU_EMAIL_PROTECTION = '1.3.6.1.5.5.7.3.4'
|
|
EKU_PKINIT_CLIENT_AUTH = '1.3.6.1.5.2.3.4'
|
|
EKU_PKINIT_KDC = '1.3.6.1.5.2.3.5'
|
|
EKU_ANY = '2.5.29.37.0'
|
|
EKU_PLACEHOLDER = '1.3.6.1.4.1.3319.6.10.16'
|
|
|
|
SAN_UPN = '1.3.6.1.4.1.311.20.2.3'
|
|
SAN_KRB5PRINCIPALNAME = '1.3.6.1.5.2.2'
|
|
|
|
|
|
@crypto_utils.register_interface(crypto_x509.Certificate)
|
|
class IPACertificate(object):
|
|
"""
|
|
A proxy class wrapping a python-cryptography certificate representation for
|
|
FreeIPA purposes
|
|
"""
|
|
def __init__(self, cert, backend=None):
|
|
"""
|
|
:param cert: A python-cryptography Certificate object
|
|
:param backend: A python-cryptography Backend object
|
|
"""
|
|
self._cert = cert
|
|
self.backend = default_backend() if backend is None else backend()
|
|
|
|
# initialize the certificate fields
|
|
# we have to do it this way so that some systems don't explode since
|
|
# some field types encode-decoding is not strongly defined
|
|
self._subject = self.__get_der_field('subject')
|
|
self._issuer = self.__get_der_field('issuer')
|
|
self._serial_number = self.__get_der_field('serialNumber')
|
|
|
|
def __getstate__(self):
|
|
state = {
|
|
'_cert': self.public_bytes(Encoding.DER),
|
|
'_subject': self.subject_bytes,
|
|
'_issuer': self.issuer_bytes,
|
|
'_serial_number': self._serial_number,
|
|
}
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
self._subject = state['_subject']
|
|
self._issuer = state['_issuer']
|
|
self._issuer = state['_serial_number']
|
|
self._cert = crypto_x509.load_der_x509_certificate(
|
|
state['_cert'], backend=default_backend())
|
|
|
|
def __eq__(self, other):
|
|
"""
|
|
Checks equality.
|
|
|
|
:param other: either cryptography.Certificate or IPACertificate or
|
|
bytes representing a DER-formatted certificate
|
|
"""
|
|
if (isinstance(other, (crypto_x509.Certificate, IPACertificate))):
|
|
return (self.public_bytes(Encoding.DER) ==
|
|
other.public_bytes(Encoding.DER))
|
|
elif isinstance(other, bytes):
|
|
return self.public_bytes(Encoding.DER) == other
|
|
else:
|
|
return False
|
|
|
|
def __ne__(self, other):
|
|
"""
|
|
Checks not equal.
|
|
"""
|
|
return not self.__eq__(other)
|
|
|
|
def __hash__(self):
|
|
"""
|
|
Computes a hash of the wrapped cryptography.Certificate.
|
|
"""
|
|
return hash(self._cert)
|
|
|
|
def __encode_extension(self, oid, critical, value):
|
|
# TODO: have another proxy for crypto_x509.Extension which would
|
|
# provide public_bytes on the top of what python-cryptography has
|
|
ext = rfc2459.Extension()
|
|
# TODO: this does not have to be so weird, pyasn1 now has codecs
|
|
# which are capable of providing python-native types
|
|
ext['extnID'] = univ.ObjectIdentifier(oid)
|
|
ext['critical'] = univ.Boolean(critical)
|
|
ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value)))
|
|
ext = encoder.encode(ext)
|
|
return ext
|
|
|
|
def __get_pyasn1_field(self, field):
|
|
"""
|
|
:returns: a field of the certificate in pyasn1 representation
|
|
"""
|
|
cert_bytes = self.tbs_certificate_bytes
|
|
cert = decoder.decode(cert_bytes, rfc2459.TBSCertificate())[0]
|
|
field = cert[field]
|
|
return field
|
|
|
|
def __get_der_field(self, field):
|
|
"""
|
|
:field: the name of the field of the certificate
|
|
:returns: bytes representing the value of a certificate field
|
|
"""
|
|
return encoder.encode(self.__get_pyasn1_field(field))
|
|
|
|
def public_bytes(self, encoding):
|
|
"""
|
|
Serializes the certificate to PEM or DER format.
|
|
"""
|
|
return self._cert.public_bytes(encoding)
|
|
|
|
def is_self_signed(self):
|
|
"""
|
|
:returns: True if this certificate is self-signed, False otherwise
|
|
"""
|
|
return self._cert.issuer == self._cert.subject
|
|
|
|
def fingerprint(self, algorithm):
|
|
"""
|
|
Counts fingerprint of the wrapped cryptography.Certificate
|
|
"""
|
|
return self._cert.fingerprint(algorithm)
|
|
|
|
@property
|
|
def serial_number(self):
|
|
return self._cert.serial_number
|
|
|
|
@property
|
|
def serial_number_bytes(self):
|
|
return self._serial_number
|
|
|
|
@property
|
|
def version(self):
|
|
return self._cert.version
|
|
|
|
@property
|
|
def subject(self):
|
|
return self._cert.subject
|
|
|
|
@property
|
|
def subject_bytes(self):
|
|
return self._subject
|
|
|
|
@property
|
|
def signature_hash_algorithm(self):
|
|
"""
|
|
Returns a HashAlgorithm corresponding to the type of the digest signed
|
|
in the certificate.
|
|
"""
|
|
return self._cert.signature_hash_algorithm
|
|
|
|
@property
|
|
def signature_algorithm_oid(self):
|
|
"""
|
|
Returns the ObjectIdentifier of the signature algorithm.
|
|
"""
|
|
return self._cert.signature_algorithm_oid
|
|
|
|
@property
|
|
def signature(self):
|
|
"""
|
|
Returns the signature bytes.
|
|
"""
|
|
return self._cert.signature
|
|
|
|
@property
|
|
def issuer(self):
|
|
return self._cert.issuer
|
|
|
|
@property
|
|
def issuer_bytes(self):
|
|
return self._issuer
|
|
|
|
@property
|
|
def not_valid_before(self):
|
|
return self._cert.not_valid_before
|
|
|
|
@property
|
|
def not_valid_after(self):
|
|
return self._cert.not_valid_after
|
|
|
|
@property
|
|
def tbs_certificate_bytes(self):
|
|
return self._cert.tbs_certificate_bytes
|
|
|
|
@property
|
|
def extensions(self):
|
|
# TODO: own Extension and Extensions classes proxying
|
|
# python-cryptography
|
|
return self._cert.extensions
|
|
|
|
def public_key(self):
|
|
return self._cert.public_key()
|
|
|
|
@property
|
|
def public_key_info_bytes(self):
|
|
return self._cert.public_key().public_bytes(
|
|
encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)
|
|
|
|
@property
|
|
def extended_key_usage(self):
|
|
try:
|
|
ext_key_usage = self._cert.extensions.get_extension_for_oid(
|
|
crypto_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
|
|
except crypto_x509.ExtensionNotFound:
|
|
return None
|
|
|
|
return set(oid.dotted_string for oid in ext_key_usage)
|
|
|
|
@property
|
|
def extended_key_usage_bytes(self):
|
|
eku = self.extended_key_usage
|
|
if eku is None:
|
|
return
|
|
|
|
ekurfc = rfc2459.ExtKeyUsageSyntax()
|
|
for i, oid in enumerate(eku):
|
|
ekurfc[i] = univ.ObjectIdentifier(oid)
|
|
ekurfc = encoder.encode(ekurfc)
|
|
return self.__encode_extension('2.5.29.37', EKU_ANY not in eku, ekurfc)
|
|
|
|
@property
|
|
def san_general_names(self):
|
|
"""
|
|
Return SAN general names from a python-cryptography
|
|
certificate object. If the SAN extension is not present,
|
|
return an empty sequence.
|
|
|
|
Because python-cryptography does not yet provide a way to
|
|
handle unrecognised critical extensions (which may occur),
|
|
we must parse the certificate and extract the General Names.
|
|
For uniformity with other code, we manually construct values
|
|
of python-crytography GeneralName subtypes.
|
|
|
|
python-cryptography does not yet provide types for
|
|
ediPartyName or x400Address, so we drop these name types.
|
|
|
|
otherNames are NOT instantiated to more specific types where
|
|
the type is known. Use ``process_othernames`` to do that.
|
|
|
|
When python-cryptography can handle certs with unrecognised
|
|
critical extensions and implements ediPartyName and
|
|
x400Address, this function (and helpers) will be redundant
|
|
and should go away.
|
|
|
|
"""
|
|
gns = self.__pyasn1_get_san_general_names()
|
|
|
|
GENERAL_NAME_CONSTRUCTORS = {
|
|
'rfc822Name': lambda x: crypto_x509.RFC822Name(unicode(x)),
|
|
'dNSName': lambda x: crypto_x509.DNSName(unicode(x)),
|
|
'directoryName': _pyasn1_to_cryptography_directoryname,
|
|
'registeredID': _pyasn1_to_cryptography_registeredid,
|
|
'iPAddress': _pyasn1_to_cryptography_ipaddress,
|
|
'uniformResourceIdentifier':
|
|
lambda x: crypto_x509.UniformResourceIdentifier(unicode(x)),
|
|
'otherName': _pyasn1_to_cryptography_othername,
|
|
}
|
|
|
|
result = []
|
|
|
|
for gn in gns:
|
|
gn_type = gn.getName()
|
|
if gn_type in GENERAL_NAME_CONSTRUCTORS:
|
|
result.append(
|
|
GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
|
|
|
|
return result
|
|
|
|
def __pyasn1_get_san_general_names(self):
|
|
# pyasn1 returns None when the key is not present in the certificate
|
|
# but we need an iterable
|
|
extensions = self.__get_pyasn1_field('extensions') or []
|
|
OID_SAN = univ.ObjectIdentifier('2.5.29.17')
|
|
gns = []
|
|
for ext in extensions:
|
|
if ext['extnID'] == OID_SAN:
|
|
der = decoder.decode(
|
|
ext['extnValue'], asn1Spec=univ.OctetString())[0]
|
|
gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
|
|
break
|
|
return gns
|
|
|
|
@property
|
|
def san_a_label_dns_names(self):
|
|
gns = self.__pyasn1_get_san_general_names()
|
|
result = []
|
|
|
|
for gn in gns:
|
|
if gn.getName() == 'dNSName':
|
|
result.append(unicode(gn.getComponent()))
|
|
|
|
return result
|
|
|
|
def match_hostname(self, hostname):
|
|
match_cert = {}
|
|
|
|
match_cert['subject'] = match_subject = []
|
|
for rdn in self._cert.subject.rdns:
|
|
match_rdn = []
|
|
for ava in rdn:
|
|
if ava.oid == crypto_x509.oid.NameOID.COMMON_NAME:
|
|
match_rdn.append(('commonName', ava.value))
|
|
match_subject.append(match_rdn)
|
|
|
|
values = self.san_a_label_dns_names
|
|
if values:
|
|
match_cert['subjectAltName'] = match_san = []
|
|
for value in values:
|
|
match_san.append(('DNS', value))
|
|
|
|
ssl.match_hostname(match_cert, DNSName(hostname).ToASCII())
|
|
|
|
|
|
def load_pem_x509_certificate(data):
|
|
"""
|
|
Load an X.509 certificate in PEM format.
|
|
|
|
:returns: a ``IPACertificate`` object.
|
|
:raises: ``ValueError`` if unable to load the certificate.
|
|
"""
|
|
return IPACertificate(
|
|
crypto_x509.load_pem_x509_certificate(data, backend=default_backend())
|
|
)
|
|
|
|
|
|
def load_der_x509_certificate(data):
|
|
"""
|
|
Load an X.509 certificate in DER format.
|
|
|
|
:returns: a ``IPACertificate`` object.
|
|
:raises: ``ValueError`` if unable to load the certificate.
|
|
"""
|
|
return IPACertificate(
|
|
crypto_x509.load_der_x509_certificate(data, backend=default_backend())
|
|
)
|
|
|
|
|
|
def load_unknown_x509_certificate(data):
|
|
"""
|
|
Only use this function when you can't be sure what kind of format does
|
|
your certificate have, e.g. input certificate files in installers
|
|
|
|
:returns: a ``IPACertificate`` object.
|
|
:raises: ``ValueError`` if unable to load the certificate.
|
|
"""
|
|
try:
|
|
return load_pem_x509_certificate(data)
|
|
except ValueError:
|
|
return load_der_x509_certificate(data)
|
|
|
|
|
|
def load_certificate_from_file(filename, dbdir=None):
|
|
"""
|
|
Load a certificate from a PEM file.
|
|
|
|
Returns a python-cryptography ``Certificate`` object.
|
|
"""
|
|
with open(filename, mode='rb') as f:
|
|
return load_pem_x509_certificate(f.read())
|
|
|
|
|
|
def load_certificate_list(data):
|
|
"""
|
|
Load a certificate list from a sequence of concatenated PEMs.
|
|
|
|
Return a list of python-cryptography ``Certificate`` objects.
|
|
"""
|
|
certs = PEM_REGEX.findall(data)
|
|
return [load_pem_x509_certificate(cert) for cert in certs]
|
|
|
|
|
|
def load_certificate_list_from_file(filename):
|
|
"""
|
|
Load a certificate list from a PEM file.
|
|
|
|
Return a list of python-cryptography ``Certificate`` objects.
|
|
|
|
"""
|
|
with open(filename, 'rb') as f:
|
|
return load_certificate_list(f.read())
|
|
|
|
|
|
def pkcs7_to_certs(data, datatype=PEM):
|
|
"""
|
|
Extract certificates from a PKCS #7 object.
|
|
|
|
:returns: a ``list`` of ``IPACertificate`` objects.
|
|
"""
|
|
if datatype == PEM:
|
|
match = re.match(
|
|
br'-----BEGIN PKCS7-----(.*?)-----END PKCS7-----',
|
|
data,
|
|
re.DOTALL)
|
|
if not match:
|
|
raise ValueError("not a valid PKCS#7 PEM")
|
|
|
|
data = base64.b64decode(match.group(1))
|
|
|
|
content_info, tail = decoder.decode(data, rfc2315.ContentInfo())
|
|
if tail:
|
|
raise ValueError("not a valid PKCS#7 message")
|
|
|
|
if content_info['contentType'] != rfc2315.signedData:
|
|
raise ValueError("not a PKCS#7 signed data message")
|
|
|
|
signed_data, tail = decoder.decode(bytes(content_info['content']),
|
|
rfc2315.SignedData())
|
|
if tail:
|
|
raise ValueError("not a valid PKCS#7 signed data message")
|
|
|
|
result = []
|
|
|
|
for certificate in signed_data['certificates']:
|
|
certificate = encoder.encode(certificate)
|
|
certificate = load_der_x509_certificate(certificate)
|
|
result.append(certificate)
|
|
|
|
return result
|
|
|
|
|
|
def validate_pem_x509_certificate(cert):
|
|
"""
|
|
Perform cert validation by trying to load it via python-cryptography.
|
|
"""
|
|
try:
|
|
load_pem_x509_certificate(cert)
|
|
except ValueError as e:
|
|
raise errors.CertificateFormatError(error=str(e))
|
|
|
|
|
|
def validate_der_x509_certificate(cert):
|
|
"""
|
|
Perform cert validation by trying to load it via python-cryptography.
|
|
"""
|
|
try:
|
|
load_der_x509_certificate(cert)
|
|
except ValueError as e:
|
|
raise errors.CertificateFormatError(error=str(e))
|
|
|
|
|
|
def write_certificate(cert, filename):
|
|
"""
|
|
Write the certificate to a file in PEM format.
|
|
|
|
The cert value can be either DER or PEM-encoded, it will be normalized
|
|
to DER regardless, then back out to PEM.
|
|
"""
|
|
|
|
try:
|
|
with open(filename, 'wb') as fp:
|
|
fp.write(cert.public_bytes(Encoding.PEM))
|
|
except (IOError, OSError) as e:
|
|
raise errors.FileError(reason=str(e))
|
|
|
|
|
|
def write_certificate_list(certs, filename):
|
|
"""
|
|
Write a list of certificates to a file in PEM format.
|
|
|
|
:param certs: a list of IPACertificate objects to be written to a file
|
|
:param filename: a path to the file the certificates should be written into
|
|
"""
|
|
|
|
try:
|
|
with open(filename, 'wb') as f:
|
|
for cert in certs:
|
|
f.write(cert.public_bytes(Encoding.PEM))
|
|
except (IOError, OSError) as e:
|
|
raise errors.FileError(reason=str(e))
|
|
|
|
|
|
class _PrincipalName(univ.Sequence):
|
|
componentType = namedtype.NamedTypes(
|
|
namedtype.NamedType('name-type', univ.Integer().subtype(
|
|
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
|
|
),
|
|
namedtype.NamedType('name-string', univ.SequenceOf(char.GeneralString()).subtype(
|
|
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))
|
|
),
|
|
)
|
|
|
|
|
|
class _KRB5PrincipalName(univ.Sequence):
|
|
componentType = namedtype.NamedTypes(
|
|
namedtype.NamedType('realm', char.GeneralString().subtype(
|
|
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
|
|
),
|
|
namedtype.NamedType('principalName', _PrincipalName().subtype(
|
|
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))
|
|
),
|
|
)
|
|
|
|
|
|
def _decode_krb5principalname(data):
|
|
principal = decoder.decode(data, asn1Spec=_KRB5PrincipalName())[0]
|
|
realm = (unicode(principal['realm']).replace('\\', '\\\\')
|
|
.replace('@', '\\@'))
|
|
name = principal['principalName']['name-string']
|
|
name = u'/'.join(unicode(n).replace('\\', '\\\\')
|
|
.replace('/', '\\/')
|
|
.replace('@', '\\@') for n in name)
|
|
name = u'%s@%s' % (name, realm)
|
|
return name
|
|
|
|
|
|
class KRB5PrincipalName(crypto_x509.general_name.OtherName):
|
|
def __init__(self, type_id, value):
|
|
super(KRB5PrincipalName, self).__init__(type_id, value)
|
|
self.name = _decode_krb5principalname(value)
|
|
|
|
|
|
class UPN(crypto_x509.general_name.OtherName):
|
|
def __init__(self, type_id, value):
|
|
super(UPN, self).__init__(type_id, value)
|
|
self.name = unicode(
|
|
decoder.decode(value, asn1Spec=char.UTF8String())[0])
|
|
|
|
|
|
OTHERNAME_CLASS_MAP = {
|
|
SAN_KRB5PRINCIPALNAME: KRB5PrincipalName,
|
|
SAN_UPN: UPN,
|
|
}
|
|
|
|
|
|
def process_othernames(gns):
|
|
"""
|
|
Process python-cryptography GeneralName values, yielding
|
|
OtherName values of more specific type if type is known.
|
|
|
|
"""
|
|
for gn in gns:
|
|
if isinstance(gn, crypto_x509.general_name.OtherName):
|
|
cls = OTHERNAME_CLASS_MAP.get(
|
|
gn.type_id.dotted_string,
|
|
crypto_x509.general_name.OtherName)
|
|
yield cls(gn.type_id, gn.value)
|
|
else:
|
|
yield gn
|
|
|
|
|
|
def _pyasn1_to_cryptography_directoryname(dn):
|
|
attrs = []
|
|
|
|
# Name is CHOICE { RDNSequence } (only one possibility)
|
|
for rdn in dn.getComponent():
|
|
for ava in rdn:
|
|
attr = crypto_x509.NameAttribute(
|
|
_pyasn1_to_cryptography_oid(ava['type']),
|
|
unicode(decoder.decode(ava['value'])[0])
|
|
)
|
|
attrs.append(attr)
|
|
|
|
return crypto_x509.DirectoryName(crypto_x509.Name(attrs))
|
|
|
|
|
|
def _pyasn1_to_cryptography_registeredid(oid):
|
|
return crypto_x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
|
|
|
|
|
|
def _pyasn1_to_cryptography_ipaddress(octet_string):
|
|
return crypto_x509.IPAddress(
|
|
ipaddress.ip_address(bytes(octet_string)))
|
|
|
|
|
|
def _pyasn1_to_cryptography_othername(on):
|
|
return crypto_x509.OtherName(
|
|
_pyasn1_to_cryptography_oid(on['type-id']),
|
|
bytes(on['value'])
|
|
)
|
|
|
|
|
|
def _pyasn1_to_cryptography_oid(oid):
|
|
return crypto_x509.ObjectIdentifier(str(oid))
|
|
|
|
|
|
def chunk(size, s):
|
|
"""Yield chunks of the specified size from the given string.
|
|
|
|
The input must be a multiple of the chunk size (otherwise
|
|
trailing characters are dropped).
|
|
|
|
Works on character strings only.
|
|
|
|
"""
|
|
return (u''.join(span) for span in six.moves.zip(*[iter(s)] * size))
|
|
|
|
|
|
def add_colons(s):
|
|
"""Add colons between each nibble pair in a hex string."""
|
|
return u':'.join(chunk(2, s))
|
|
|
|
|
|
def to_hex_with_colons(bs):
|
|
"""Convert bytes to a hex string with colons."""
|
|
return add_colons(binascii.hexlify(bs).decode('utf-8'))
|
|
|
|
|
|
class UTC(datetime.tzinfo):
|
|
ZERO = datetime.timedelta(0)
|
|
|
|
def tzname(self, dt):
|
|
return "UTC"
|
|
|
|
def utcoffset(self, dt):
|
|
return self.ZERO
|
|
|
|
def dst(self, dt):
|
|
return self.ZERO
|
|
|
|
|
|
def format_datetime(t):
|
|
if t.tzinfo is None:
|
|
t = t.replace(tzinfo=UTC())
|
|
return unicode(t.strftime("%a %b %d %H:%M:%S %Y %Z"))
|