freeipa/ipaserver/p11helper.py
Christian Heimes 5c09dcdb98 Grammar: whitespace is a word
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Alexander Bokovoy <abbra@users.noreply.github.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
2020-06-23 10:16:29 +02:00

1867 lines
64 KiB
Python

#
# Copyright (C) 2014 FreeIPA Contributors see COPYING for license
#
import random
import ctypes.util
import binascii
import struct
import six
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cffi import FFI
if six.PY3:
unicode = str
_ffi = FFI()
_ffi.cdef('''
/* p11-kit/pkcs11.h */
typedef unsigned long CK_FLAGS;
struct _CK_VERSION
{
unsigned char major;
unsigned char minor;
};
typedef unsigned long CK_SLOT_ID;
typedef CK_SLOT_ID *CK_SLOT_ID_PTR;
typedef unsigned long CK_SESSION_HANDLE;
typedef unsigned long CK_USER_TYPE;
typedef unsigned long CK_OBJECT_HANDLE;
typedef unsigned long CK_OBJECT_CLASS;
typedef unsigned long CK_KEY_TYPE;
typedef unsigned long CK_ATTRIBUTE_TYPE;
typedef unsigned long ck_flags_t;
typedef unsigned char CK_BBOOL;
typedef unsigned long int CK_ULONG;
typedef CK_ULONG *CK_ULONG_PTR;
struct _CK_ATTRIBUTE
{
CK_ATTRIBUTE_TYPE type;
void *pValue;
unsigned long ulValueLen;
};
typedef unsigned long CK_MECHANISM_TYPE;
struct _CK_MECHANISM
{
CK_MECHANISM_TYPE mechanism;
void *pParameter;
unsigned long ulParameterLen;
};
struct _CK_TOKEN_INFO
{
unsigned char label[32];
unsigned char manufacturer_id[32];
unsigned char model[16];
unsigned char serial_number[16];
ck_flags_t flags;
unsigned long max_session_count;
unsigned long session_count;
unsigned long max_rw_session_count;
unsigned long rw_session_count;
unsigned long max_pin_len;
unsigned long min_pin_len;
unsigned long total_public_memory;
unsigned long free_public_memory;
unsigned long total_private_memory;
unsigned long free_private_memory;
struct _CK_VERSION hardware_version;
struct _CK_VERSION firmware_version;
unsigned char utc_time[16];
};
typedef struct _CK_TOKEN_INFO CK_TOKEN_INFO;
typedef CK_TOKEN_INFO *CK_TOKEN_INFO_PTR;
typedef unsigned long CK_RV;
typedef ... *CK_NOTIFY;
struct _CK_FUNCTION_LIST;
typedef CK_RV (*CK_C_Initialize) (void *init_args);
typedef CK_RV (*CK_C_Finalize) (void *pReserved);
typedef ... *CK_C_GetInfo;
typedef ... *CK_C_GetFunctionList;
CK_RV C_GetFunctionList (struct _CK_FUNCTION_LIST **function_list);
typedef CK_RV (*CK_C_GetSlotList) (CK_BBOOL tokenPresent,
CK_SLOT_ID_PTR pSlotList,
CK_ULONG_PTR pulCount);
typedef ... *CK_C_GetSlotInfo;
typedef CK_RV (*CK_C_GetTokenInfo) (CK_SLOT_ID slotID,
CK_TOKEN_INFO_PTR pInfo);
typedef ... *CK_C_WaitForSlotEvent;
typedef ... *CK_C_GetMechanismList;
typedef ... *CK_C_GetMechanismInfo;
typedef ... *CK_C_InitToken;
typedef ... *CK_C_InitPIN;
typedef ... *CK_C_SetPIN;
typedef CK_RV (*CK_C_OpenSession) (CK_SLOT_ID slotID, CK_FLAGS flags,
void *application, CK_NOTIFY notify,
CK_SESSION_HANDLE *session);
typedef CK_RV (*CK_C_CloseSession) (CK_SESSION_HANDLE session);
typedef ... *CK_C_CloseAllSessions;
typedef ... *CK_C_GetSessionInfo;
typedef ... *CK_C_GetOperationState;
typedef ... *CK_C_SetOperationState;
typedef CK_RV (*CK_C_Login) (CK_SESSION_HANDLE session, CK_USER_TYPE user_type,
unsigned char *pin, unsigned long pin_len);
typedef CK_RV (*CK_C_Logout) (CK_SESSION_HANDLE session);
typedef CK_RV (*CK_C_CreateObject) (CK_SESSION_HANDLE session,
struct _CK_ATTRIBUTE *templ,
unsigned long count,
CK_OBJECT_HANDLE *object);
typedef ... *CK_C_CopyObject;
typedef CK_RV (*CK_C_DestroyObject) (CK_SESSION_HANDLE session,
CK_OBJECT_HANDLE object);
typedef ... *CK_C_GetObjectSize;
typedef CK_RV (*CK_C_GetAttributeValue) (CK_SESSION_HANDLE session,
CK_OBJECT_HANDLE object,
struct _CK_ATTRIBUTE *templ,
unsigned long count);
typedef CK_RV (*CK_C_SetAttributeValue) (CK_SESSION_HANDLE session,
CK_OBJECT_HANDLE object,
struct _CK_ATTRIBUTE *templ,
unsigned long count);
typedef CK_RV (*CK_C_FindObjectsInit) (CK_SESSION_HANDLE session,
struct _CK_ATTRIBUTE *templ,
unsigned long count);
typedef CK_RV (*CK_C_FindObjects) (CK_SESSION_HANDLE session,
CK_OBJECT_HANDLE *object,
unsigned long max_object_count,
unsigned long *object_count);
typedef CK_RV (*CK_C_FindObjectsFinal) (CK_SESSION_HANDLE session);
typedef ... *CK_C_EncryptInit;
typedef ... *CK_C_Encrypt;
typedef ... *CK_C_EncryptUpdate;
typedef ... *CK_C_EncryptFinal;
typedef ... *CK_C_DecryptInit;
typedef ... *CK_C_Decrypt;
typedef ... *CK_C_DecryptUpdate;
typedef ... *CK_C_DecryptFinal;
typedef ... *CK_C_DigestInit;
typedef ... *CK_C_Digest;
typedef ... *CK_C_DigestUpdate;
typedef ... *CK_C_DigestKey;
typedef ... *CK_C_DigestFinal;
typedef ... *CK_C_SignInit;
typedef ... *CK_C_Sign;
typedef ... *CK_C_SignUpdate;
typedef ... *CK_C_SignFinal;
typedef ... *CK_C_SignRecoverInit;
typedef ... *CK_C_SignRecover;
typedef ... *CK_C_VerifyInit;
typedef ... *CK_C_Verify;
typedef ... *CK_C_VerifyUpdate;
typedef ... *CK_C_VerifyFinal;
typedef ... *CK_C_VerifyRecoverInit;
typedef ... *CK_C_VerifyRecover;
typedef ... *CK_C_DigestEncryptUpdate;
typedef ... *CK_C_DecryptDigestUpdate;
typedef ... *CK_C_SignEncryptUpdate;
typedef ... *CK_C_DecryptVerifyUpdate;
typedef CK_RV (*CK_C_GenerateKey) (CK_SESSION_HANDLE session,
struct _CK_MECHANISM *mechanism,
struct _CK_ATTRIBUTE *templ,
unsigned long count,
CK_OBJECT_HANDLE *key);
typedef CK_RV (*CK_C_GenerateKeyPair) (CK_SESSION_HANDLE session,
struct _CK_MECHANISM *mechanism,
struct _CK_ATTRIBUTE *
public_key_template,
unsigned long
public_key_attribute_count,
struct _CK_ATTRIBUTE *
private_key_template,
unsigned long
private_key_attribute_count,
CK_OBJECT_HANDLE *public_key,
CK_OBJECT_HANDLE *private_key);
typedef CK_RV (*CK_C_WrapKey) (CK_SESSION_HANDLE session,
struct _CK_MECHANISM *mechanism,
CK_OBJECT_HANDLE wrapping_key,
CK_OBJECT_HANDLE key,
unsigned char *wrapped_key,
unsigned long *wrapped_key_len);
typedef CK_RV (*CK_C_UnwrapKey) (CK_SESSION_HANDLE session,
struct _CK_MECHANISM *mechanism,
CK_OBJECT_HANDLE unwrapping_key,
unsigned char *wrapped_key,
unsigned long wrapped_key_len,
struct _CK_ATTRIBUTE *templ,
unsigned long attribute_count,
CK_OBJECT_HANDLE *key);
typedef ... *CK_C_DeriveKey;
typedef ... *CK_C_SeedRandom;
typedef ... *CK_C_GenerateRandom;
typedef ... *CK_C_GetFunctionStatus;
typedef ... *CK_C_CancelFunction;
struct _CK_FUNCTION_LIST
{
struct _CK_VERSION version;
CK_C_Initialize C_Initialize;
CK_C_Finalize C_Finalize;
CK_C_GetInfo C_GetInfo;
CK_C_GetFunctionList C_GetFunctionList;
CK_C_GetSlotList C_GetSlotList;
CK_C_GetSlotInfo C_GetSlotInfo;
CK_C_GetTokenInfo C_GetTokenInfo;
CK_C_GetMechanismList C_GetMechanismList;
CK_C_GetMechanismInfo C_GetMechanismInfo;
CK_C_InitToken C_InitToken;
CK_C_InitPIN C_InitPIN;
CK_C_SetPIN C_SetPIN;
CK_C_OpenSession C_OpenSession;
CK_C_CloseSession C_CloseSession;
CK_C_CloseAllSessions C_CloseAllSessions;
CK_C_GetSessionInfo C_GetSessionInfo;
CK_C_GetOperationState C_GetOperationState;
CK_C_SetOperationState C_SetOperationState;
CK_C_Login C_Login;
CK_C_Logout C_Logout;
CK_C_CreateObject C_CreateObject;
CK_C_CopyObject C_CopyObject;
CK_C_DestroyObject C_DestroyObject;
CK_C_GetObjectSize C_GetObjectSize;
CK_C_GetAttributeValue C_GetAttributeValue;
CK_C_SetAttributeValue C_SetAttributeValue;
CK_C_FindObjectsInit C_FindObjectsInit;
CK_C_FindObjects C_FindObjects;
CK_C_FindObjectsFinal C_FindObjectsFinal;
CK_C_EncryptInit C_EncryptInit;
CK_C_Encrypt C_Encrypt;
CK_C_EncryptUpdate C_EncryptUpdate;
CK_C_EncryptFinal C_EncryptFinal;
CK_C_DecryptInit C_DecryptInit;
CK_C_Decrypt C_Decrypt;
CK_C_DecryptUpdate C_DecryptUpdate;
CK_C_DecryptFinal C_DecryptFinal;
CK_C_DigestInit C_DigestInit;
CK_C_Digest C_Digest;
CK_C_DigestUpdate C_DigestUpdate;
CK_C_DigestKey C_DigestKey;
CK_C_DigestFinal C_DigestFinal;
CK_C_SignInit C_SignInit;
CK_C_Sign C_Sign;
CK_C_SignUpdate C_SignUpdate;
CK_C_SignFinal C_SignFinal;
CK_C_SignRecoverInit C_SignRecoverInit;
CK_C_SignRecover C_SignRecover;
CK_C_VerifyInit C_VerifyInit;
CK_C_Verify C_Verify;
CK_C_VerifyUpdate C_VerifyUpdate;
CK_C_VerifyFinal C_VerifyFinal;
CK_C_VerifyRecoverInit C_VerifyRecoverInit;
CK_C_VerifyRecover C_VerifyRecover;
CK_C_DigestEncryptUpdate C_DigestEncryptUpdate;
CK_C_DecryptDigestUpdate C_DecryptDigestUpdate;
CK_C_SignEncryptUpdate C_SignEncryptUpdate;
CK_C_DecryptVerifyUpdate C_DecryptVerifyUpdate;
CK_C_GenerateKey C_GenerateKey;
CK_C_GenerateKeyPair C_GenerateKeyPair;
CK_C_WrapKey C_WrapKey;
CK_C_UnwrapKey C_UnwrapKey;
CK_C_DeriveKey C_DeriveKey;
CK_C_SeedRandom C_SeedRandom;
CK_C_GenerateRandom C_GenerateRandom;
CK_C_GetFunctionStatus C_GetFunctionStatus;
CK_C_CancelFunction C_CancelFunction;
CK_C_WaitForSlotEvent C_WaitForSlotEvent;
};
typedef unsigned char CK_BYTE;
typedef unsigned char CK_UTF8CHAR;
typedef CK_BYTE *CK_BYTE_PTR;
typedef CK_OBJECT_HANDLE *CK_OBJECT_HANDLE_PTR;
typedef struct _CK_ATTRIBUTE CK_ATTRIBUTE;
typedef struct _CK_ATTRIBUTE *CK_ATTRIBUTE_PTR;
typedef struct _CK_MECHANISM CK_MECHANISM;
typedef struct _CK_FUNCTION_LIST *CK_FUNCTION_LIST_PTR;
/* p11-kit/uri.h */
typedef enum {
DUMMY /* ..., */
} P11KitUriType;
typedef ... P11KitUri;
CK_ATTRIBUTE_PTR p11_kit_uri_get_attributes (P11KitUri *uri,
CK_ULONG *n_attrs);
int p11_kit_uri_any_unrecognized (P11KitUri *uri);
P11KitUri* p11_kit_uri_new (void);
int p11_kit_uri_parse (const char *string,
P11KitUriType uri_type,
P11KitUri *uri);
void p11_kit_uri_free (P11KitUri *uri);
/* p11helper.c */
struct ck_rsa_pkcs_oaep_params {
CK_MECHANISM_TYPE hash_alg;
unsigned long mgf;
unsigned long source;
void *source_data;
unsigned long source_data_len;
};
typedef struct ck_rsa_pkcs_oaep_params CK_RSA_PKCS_OAEP_PARAMS;
''')
_libp11_kit = _ffi.dlopen(ctypes.util.find_library('p11-kit'))
# utility
NULL = _ffi.NULL
unsigned_char = _ffi.typeof('unsigned char')
unsigned_long = _ffi.typeof('unsigned long')
sizeof = _ffi.sizeof
def new_ptr(ctype, *args):
return _ffi.new(_ffi.getctype(ctype, '*'), *args)
def new_array(ctype, *args):
return _ffi.new(_ffi.getctype(ctype, '[]'), *args)
# p11-kit/pkcs11.h
CK_SESSION_HANDLE = _ffi.typeof('CK_SESSION_HANDLE')
CK_OBJECT_HANDLE = _ffi.typeof('CK_OBJECT_HANDLE')
CKU_USER = 1
CKF_RW_SESSION = 0x2
CKF_SERIAL_SESSION = 0x4
CK_OBJECT_CLASS = _ffi.typeof('CK_OBJECT_CLASS')
CKO_PUBLIC_KEY = 2
CKO_PRIVATE_KEY = 3
CKO_SECRET_KEY = 4
CKO_VENDOR_DEFINED = 0x80000000
CK_KEY_TYPE = _ffi.typeof('CK_KEY_TYPE')
CKK_RSA = 0
CKK_AES = 0x1f
CKA_CLASS = 0
CKA_TOKEN = 1
CKA_PRIVATE = 2
CKA_LABEL = 3
CKA_TRUSTED = 0x86
CKA_KEY_TYPE = 0x100
CKA_ID = 0x102
CKA_SENSITIVE = 0x103
CKA_ENCRYPT = 0x104
CKA_DECRYPT = 0x105
CKA_WRAP = 0x106
CKA_UNWRAP = 0x107
CKA_SIGN = 0x108
CKA_SIGN_RECOVER = 0x109
CKA_VERIFY = 0x10a
CKA_VERIFY_RECOVER = 0x10b
CKA_DERIVE = 0x10c
CKA_MODULUS = 0x120
CKA_MODULUS_BITS = 0x121
CKA_PUBLIC_EXPONENT = 0x122
CKA_VALUE_LEN = 0x161
CKA_EXTRACTABLE = 0x162
CKA_LOCAL = 0x163
CKA_NEVER_EXTRACTABLE = 0x164
CKA_ALWAYS_SENSITIVE = 0x165
CKA_MODIFIABLE = 0x170
CKA_ALWAYS_AUTHENTICATE = 0x202
CKA_WRAP_WITH_TRUSTED = 0x210
CKM_RSA_PKCS_KEY_PAIR_GEN = 0
CKM_RSA_PKCS = 1
CKM_RSA_PKCS_OAEP = 9
CKM_SHA_1 = 0x220
CKM_AES_KEY_GEN = 0x1080
CKR_OK = 0
CKR_ATTRIBUTE_TYPE_INVALID = 0x12
CKR_USER_NOT_LOGGED_IN = 0x101
CKR_BUFFER_TOO_SMALL = 0x150
CK_BYTE = _ffi.typeof('CK_BYTE')
CK_BBOOL = _ffi.typeof('CK_BBOOL')
CK_ULONG = _ffi.typeof('CK_ULONG')
CK_BYTE_PTR = _ffi.typeof('CK_BYTE_PTR')
CK_FALSE = 0
CK_TRUE = 1
CK_OBJECT_HANDLE_PTR = _ffi.typeof('CK_OBJECT_HANDLE_PTR')
CK_ATTRIBUTE = _ffi.typeof('CK_ATTRIBUTE')
CK_MECHANISM = _ffi.typeof('CK_MECHANISM')
CK_FUNCTION_LIST_PTR = _ffi.typeof('CK_FUNCTION_LIST_PTR')
CK_SLOT_ID = _ffi.typeof('CK_SLOT_ID')
CK_TOKEN_INFO = _ffi.typeof('CK_TOKEN_INFO')
NULL_PTR = NULL
# p11-kit/uri.h
P11_KIT_URI_OK = 0
P11_KIT_URI_FOR_OBJECT = 2
p11_kit_uri_get_attributes = _libp11_kit.p11_kit_uri_get_attributes
p11_kit_uri_any_unrecognized = _libp11_kit.p11_kit_uri_any_unrecognized
p11_kit_uri_new = _libp11_kit.p11_kit_uri_new
p11_kit_uri_parse = _libp11_kit.p11_kit_uri_parse
p11_kit_uri_free = _libp11_kit.p11_kit_uri_free
# library.c
def loadLibrary(module):
"""Load the PKCS#11 library"""
# Load PKCS #11 library
if module:
# pylint: disable=no-member
pDynLib = _ffi.dlopen(module, _ffi.RTLD_NOW | _ffi.RTLD_LOCAL)
else:
raise Error("PKCS#11 module name is empty")
# Retrieve the entry point for C_GetFunctionList
pGetFunctionList = pDynLib.C_GetFunctionList
if pGetFunctionList == NULL:
raise Error(
f"Module '{module}' has no function 'C_GetFunctionList'."
)
# Store the handle so we can dlclose it later
return pGetFunctionList, pDynLib
# p11helper.c
# compat TODO
CKM_AES_KEY_WRAP = 0x2109
CKM_AES_KEY_WRAP_PAD = 0x210a
# TODO
CKA_COPYABLE = 0x0017
CKG_MGF1_SHA1 = 0x00000001
CKZ_DATA_SPECIFIED = 0x00000001
CK_RSA_PKCS_OAEP_PARAMS = _ffi.typeof('CK_RSA_PKCS_OAEP_PARAMS')
true_ptr = new_ptr(CK_BBOOL, CK_TRUE)
false_ptr = new_ptr(CK_BBOOL, CK_FALSE)
MAX_TEMPLATE_LEN = 32
#
# Constants
#
CONST_RSA_PKCS_OAEP_PARAMS_ptr = new_ptr(CK_RSA_PKCS_OAEP_PARAMS, dict(
hash_alg=CKM_SHA_1,
mgf=CKG_MGF1_SHA1,
source=CKZ_DATA_SPECIFIED,
source_data=NULL,
source_data_len=0,
))
#
# ipap11helper Exceptions
#
class P11HelperException(Exception):
"""parent class for all exceptions"""
P11HelperException.__name__ = 'Exception'
class Error(P11HelperException):
"""general error"""
class NotFound(P11HelperException):
"""key not found"""
class DuplicationError(P11HelperException):
"""key already exists"""
########################################################################
# Support functions
#
def pyobj_to_bool(pyobj):
if pyobj:
return true_ptr
return false_ptr
def convert_py2bool(mapping):
return tuple(pyobj_to_bool(py_obj) for py_obj in mapping)
def string_to_pybytes_or_none(str, len):
if str == NULL:
return None
return _ffi.buffer(str, len)[:]
def unicode_to_char_array(unicode):
"""
Convert a unicode string to the utf8 encoded char array
:param unicode: input python unicode object
"""
try:
utf8_str = unicode.encode('utf-8')
except Exception:
raise Error("Unable to encode UTF-8")
try:
result = new_array(unsigned_char, utf8_str)
except Exception:
raise Error("Unable to get bytes from string")
l = len(utf8_str)
return result, l
def char_array_to_unicode(array, l):
"""
Convert utf-8 encoded char array to unicode object
"""
return _ffi.buffer(array, l)[:].decode('utf-8')
def int_to_bytes(value):
try:
return binascii.unhexlify('{0:x}'.format(value))
except (TypeError, binascii.Error):
return binascii.unhexlify('0{0:x}'.format(value))
def bytes_to_int(value):
return int(binascii.hexlify(value), 16)
def check_return_value(rv, message):
"""
Tests result value of pkc11 operations
"""
if rv != CKR_OK:
try:
errmsg = "Error at %s: 0x%x\n" % (message, rv)
except Exception:
raise Error("An error occured during error message generation. "
"Please report this problem. Developers will use "
"a crystal ball to find out the root cause.")
else:
raise Error(errmsg)
def _fill_template_from_parts(attr, template_len, id, id_len, label, label_len,
class_, cka_wrap, cka_unwrap):
"""
Fill template structure with pointers to attributes passed as independent
variables.
Variables with NULL values will be omitted from template.
@warning input variables should not be modified when template is in use
"""
cnt = 0
if label != NULL:
attr[0].type = CKA_LABEL
attr[0].pValue = label
attr[0].ulValueLen = label_len
attr += 1
cnt += 1
assert cnt < template_len[0]
if id != NULL:
attr[0].type = CKA_ID
attr[0].pValue = id
attr[0].ulValueLen = id_len
attr += 1
cnt += 1
assert cnt < template_len[0]
if cka_wrap != NULL:
attr[0].type = CKA_WRAP
attr[0].pValue = cka_wrap
attr[0].ulValueLen = sizeof(CK_BBOOL)
attr += 1
cnt += 1
assert cnt < template_len[0]
if cka_unwrap != NULL:
attr[0].type = CKA_UNWRAP
attr[0].pValue = cka_unwrap
attr[0].ulValueLen = sizeof(CK_BBOOL)
attr += 1
cnt += 1
assert cnt < template_len[0]
if class_ != NULL:
attr[0].type = CKA_CLASS
attr[0].pValue = class_
attr[0].ulValueLen = sizeof(CK_OBJECT_CLASS)
attr += 1
cnt += 1
assert cnt < template_len[0]
template_len[0] = cnt
def _parse_uri(uri_str):
"""
Parse string to P11-kit representation of PKCS#11 URI.
"""
uri = p11_kit_uri_new()
if not uri:
raise Error("Cannot initialize URI parser")
try:
result = p11_kit_uri_parse(uri_str, P11_KIT_URI_FOR_OBJECT, uri)
if result != P11_KIT_URI_OK:
raise Error("Cannot parse URI")
if p11_kit_uri_any_unrecognized(uri):
raise Error("PKCS#11 URI contains unsupported attributes")
except Error:
p11_kit_uri_free(uri)
raise
return uri
def _set_wrapping_mech_parameters(mech_type, mech):
"""
Function set default param values for wrapping mechanism
:param mech_type: mechanism type
:param mech: filled structure with params based on mech type
Warning: do not dealloc param values, it is static variables
"""
if mech_type in (CKM_RSA_PKCS, CKM_AES_KEY_WRAP, CKM_AES_KEY_WRAP_PAD):
mech.pParameter = NULL
mech.ulParameterLen = 0
elif mech_type == CKM_RSA_PKCS_OAEP:
# Use the same configuration as openSSL
# https://www.openssl.org/docs/crypto/RSA_public_encrypt.html
mech.pParameter = CONST_RSA_PKCS_OAEP_PARAMS_ptr
mech.ulParameterLen = sizeof(CK_RSA_PKCS_OAEP_PARAMS)
else:
raise Error("Unsupported wrapping mechanism")
mech.mechanism = mech_type
########################################################################
# P11_Helper object
#
class P11_Helper:
@property
def p11(self):
return self.p11_ptr[0]
@property
def session(self):
return self.session_ptr[0]
def _find_key(self, template, template_len):
"""
Find keys matching specified template.
Function returns list of key handles via objects parameter.
:param template: PKCS#11 template for attribute matching
"""
result_objects = []
result_object_ptr = new_ptr(CK_OBJECT_HANDLE)
objectCount_ptr = new_ptr(CK_ULONG)
rv = self.p11.C_FindObjectsInit(self.session, template, template_len)
check_return_value(rv, "Find key init")
rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1,
objectCount_ptr)
check_return_value(rv, "Find key")
while objectCount_ptr[0] > 0:
result_objects.append(result_object_ptr[0])
rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1,
objectCount_ptr)
check_return_value(rv, "Check for duplicated key")
rv = self.p11.C_FindObjectsFinal(self.session)
check_return_value(rv, "Find objects final")
return result_objects
def _id_exists(self, id, id_len, class_):
"""
Test if object with specified label, id and class exists
:param id: key ID, (if value is NULL, will not be used to find key)
:param id_len: key ID length
:param class_ key: class
:return: True if object was found, False if object doesnt exists
"""
object_count_ptr = new_ptr(CK_ULONG)
result_object_ptr = new_ptr(CK_OBJECT_HANDLE)
class_ptr = new_ptr(CK_OBJECT_CLASS, class_)
class_sec_ptr = new_ptr(CK_OBJECT_CLASS, CKO_SECRET_KEY)
template_pub_priv = new_array(CK_ATTRIBUTE, (
(CKA_ID, id, id_len),
(CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)),
))
template_sec = new_array(CK_ATTRIBUTE, (
(CKA_ID, id, id_len),
(CKA_CLASS, class_sec_ptr, sizeof(CK_OBJECT_CLASS)),
))
template_id = new_array(CK_ATTRIBUTE, (
(CKA_ID, id, id_len),
))
#
# Only one secret key with same ID is allowed
#
if class_ == CKO_SECRET_KEY:
rv = self.p11.C_FindObjectsInit(self.session, template_id, 1)
check_return_value(rv, "id, label exists init")
rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1,
object_count_ptr)
check_return_value(rv, "id, label exists")
rv = self.p11.C_FindObjectsFinal(self.session)
check_return_value(rv, "id, label exists final")
if object_count_ptr[0] > 0:
return True
return False
#
# Public and private keys can share one ID, but
#
# test if secret key with same ID exists
rv = self.p11.C_FindObjectsInit(self.session, template_sec, 2)
check_return_value(rv, "id, label exists init")
rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1,
object_count_ptr)
check_return_value(rv, "id, label exists")
rv = self.p11.C_FindObjectsFinal(self.session)
check_return_value(rv, "id, label exists final")
if object_count_ptr[0] > 0:
# object found
return True
# test if pub/private key with same id exists
object_count_ptr[0] = 0
rv = self.p11.C_FindObjectsInit(self.session, template_pub_priv, 2)
check_return_value(rv, "id, label exists init")
rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1,
object_count_ptr)
check_return_value(rv, "id, label exists")
rv = self.p11.C_FindObjectsFinal(self.session)
check_return_value(rv, "id, label exists final")
if object_count_ptr[0] > 0:
# Object found
return True
# Object not found
return False
def __init__(self, token_label, user_pin, library_path):
self.p11_ptr = new_ptr(CK_FUNCTION_LIST_PTR)
self.session_ptr = new_ptr(CK_SESSION_HANDLE)
self.session_ptr[0] = 0
self.p11_ptr[0] = NULL
self.module_handle = None
# Parse method args
if isinstance(user_pin, unicode):
user_pin = user_pin.encode()
self.token_label = token_label
try:
pGetFunctionList, module_handle = loadLibrary(library_path)
except Exception:
raise Error(f"Could not load the library '{library_path}'.")
self.module_handle = module_handle
#
# Load the function list
#
pGetFunctionList(self.p11_ptr)
#
# Initialize
#
rv = self.p11.C_Initialize(NULL)
check_return_value(rv, "initialize")
#
# Get Slot
#
slot = self.get_slot()
if slot is None:
raise Error("No slot for label {} found".format(self.token_label))
#
# Start session
#
rv = self.p11.C_OpenSession(slot,
CKF_SERIAL_SESSION | CKF_RW_SESSION, NULL,
NULL, self.session_ptr)
check_return_value(rv, "open session")
#
# Login
#
rv = self.p11.C_Login(self.session, CKU_USER, user_pin, len(user_pin))
check_return_value(rv, "log in")
def get_slot(self):
"""Get slot where then token is located
:return: slot number or None when slot not found
"""
object_count_ptr = new_ptr(CK_ULONG)
# get slots ID
slots = None
for _i in range(0, 10):
# try max N times, then die to avoid infinite iteration
rv = self.p11.C_GetSlotList(CK_TRUE, NULL, object_count_ptr)
check_return_value(rv, "get slots IDs - prepare")
result_ids_ptr = new_array(CK_SLOT_ID, object_count_ptr[0])
rv = self.p11.C_GetSlotList(
CK_TRUE, result_ids_ptr, object_count_ptr)
if rv == CKR_BUFFER_TOO_SMALL:
continue
check_return_value(rv, "get slots IDs")
slots = result_ids_ptr
break # we have slots !!!
if slots is None:
raise Error("Failed to get slots")
for slot in slots:
token_info_ptr = new_ptr(CK_TOKEN_INFO)
rv = self.p11.C_GetTokenInfo(slot, token_info_ptr)
check_return_value(rv, 'get token info')
# softhsm always returns label 32 bytes long with padding made of
# whitespaces (#32), so we have to rstrip() padding and compare
# Label was created by softhsm-util so it is not our fault that
# there are #32 as padding (cffi initializes structures with
# zeroes)
# In case that this is not valid anymore, keep in mind backward
# compatibility
if self.token_label == char_array_to_unicode(
token_info_ptr[0].label, 32).rstrip():
return slot
return None
def finalize(self):
"""
Finalize operations with pkcs11 library
"""
if self.p11 == NULL:
return
#
# Logout
#
rv = self.p11.C_Logout(self.session)
check_return_value(rv, "log out")
#
# End session
#
rv = self.p11.C_CloseSession(self.session)
check_return_value(rv, "close session")
#
# Finalize
#
self.p11.C_Finalize(NULL)
self.p11_ptr[0] = NULL
self.session_ptr[0] = 0
self.module_handle = None
#################################################################
# Methods working with keys
#
def generate_master_key(self, label, id, key_length=16, cka_copyable=True,
cka_decrypt=False, cka_derive=False,
cka_encrypt=False, cka_extractable=True,
cka_modifiable=True, cka_private=True,
cka_sensitive=True, cka_sign=False,
cka_unwrap=True, cka_verify=False, cka_wrap=True,
cka_wrap_with_trusted=False):
"""
Generate master key
:return: master key handle
"""
if isinstance(id, unicode):
id = id.encode()
attrs = (
cka_copyable,
cka_decrypt,
cka_derive,
cka_encrypt,
cka_extractable,
cka_modifiable,
cka_private,
cka_sensitive,
cka_sign,
cka_unwrap,
cka_verify,
cka_wrap,
cka_wrap_with_trusted,
)
key_length_ptr = new_ptr(CK_ULONG, key_length)
master_key_ptr = new_ptr(CK_OBJECT_HANDLE)
label_unicode = label
id_length = len(id)
id_ = new_array(CK_BYTE, id)
# TODO check long overflow
label, label_length = unicode_to_char_array(label_unicode)
# TODO param?
mechanism_ptr = new_ptr(CK_MECHANISM, (
CKM_AES_KEY_GEN, NULL_PTR, 0
))
if key_length not in (16, 24, 32):
raise Error("generate_master_key: key length allowed values are: "
"16, 24 and 32")
if self._id_exists(id_, id_length, CKO_SECRET_KEY):
raise DuplicationError("Master key with same ID already exists")
# Process keyword boolean arguments
(_cka_copyable_ptr, cka_decrypt_ptr, cka_derive_ptr, cka_encrypt_ptr,
cka_extractable_ptr, cka_modifiable_ptr, cka_private_ptr,
cka_sensitive_ptr, cka_sign_ptr, cka_unwrap_ptr, cka_verify_ptr,
cka_wrap_ptr, cka_wrap_with_trusted_ptr,) = convert_py2bool(attrs)
symKeyTemplate = new_array(CK_ATTRIBUTE, (
(CKA_ID, id_, id_length),
(CKA_LABEL, label, label_length),
(CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)),
(CKA_VALUE_LEN, key_length_ptr, sizeof(CK_ULONG)),
# TODO Softhsm doesn't support it
# (CKA_COPYABLE, cka_copyable_ptr, sizeof(CK_BBOOL)),
(CKA_DECRYPT, cka_decrypt_ptr, sizeof(CK_BBOOL)),
(CKA_DERIVE, cka_derive_ptr, sizeof(CK_BBOOL)),
(CKA_ENCRYPT, cka_encrypt_ptr, sizeof(CK_BBOOL)),
(CKA_EXTRACTABLE, cka_extractable_ptr, sizeof(CK_BBOOL)),
(CKA_MODIFIABLE, cka_modifiable_ptr, sizeof(CK_BBOOL)),
(CKA_PRIVATE, cka_private_ptr, sizeof(CK_BBOOL)),
(CKA_SENSITIVE, cka_sensitive_ptr, sizeof(CK_BBOOL)),
(CKA_SIGN, cka_sign_ptr, sizeof(CK_BBOOL)),
(CKA_UNWRAP, cka_unwrap_ptr, sizeof(CK_BBOOL)),
(CKA_VERIFY, cka_verify_ptr, sizeof(CK_BBOOL)),
(CKA_WRAP, cka_wrap_ptr, sizeof(CK_BBOOL)),
(CKA_WRAP_WITH_TRUSTED, cka_wrap_with_trusted_ptr,
sizeof(CK_BBOOL)),
))
rv = self.p11.C_GenerateKey(self.session, mechanism_ptr,
symKeyTemplate,
(sizeof(symKeyTemplate) //
sizeof(CK_ATTRIBUTE)), master_key_ptr)
check_return_value(rv, "generate master key")
return master_key_ptr[0]
def generate_replica_key_pair(self, label, id, modulus_bits=2048,
pub_cka_copyable=True, pub_cka_derive=False,
pub_cka_encrypt=False,
pub_cka_modifiable=True,
pub_cka_private=True, pub_cka_trusted=False,
pub_cka_verify=False,
pub_cka_verify_recover=False,
pub_cka_wrap=True,
priv_cka_always_authenticate=False,
priv_cka_copyable=True,
priv_cka_decrypt=False,
priv_cka_derive=False,
priv_cka_extractable=False,
priv_cka_modifiable=True,
priv_cka_private=True,
priv_cka_sensitive=True,
priv_cka_sign=False,
priv_cka_sign_recover=False,
priv_cka_unwrap=True,
priv_cka_wrap_with_trusted=False):
"""
Generate replica keys
:returns: tuple (public_key_handle, private_key_handle)
"""
if isinstance(id, unicode):
id = id.encode()
attrs_pub = (
pub_cka_copyable,
pub_cka_derive,
pub_cka_encrypt,
pub_cka_modifiable,
pub_cka_private,
pub_cka_trusted,
pub_cka_verify,
pub_cka_verify_recover,
pub_cka_wrap,
)
attrs_priv = (
priv_cka_always_authenticate,
priv_cka_copyable,
priv_cka_decrypt,
priv_cka_derive,
priv_cka_extractable,
priv_cka_modifiable,
priv_cka_private,
priv_cka_sensitive,
priv_cka_sign,
priv_cka_sign_recover,
priv_cka_unwrap,
priv_cka_wrap_with_trusted,
)
label_unicode = label
id_ = new_array(CK_BYTE, id)
id_length = len(id)
label, label_length = unicode_to_char_array(label_unicode)
public_key_ptr = new_ptr(CK_OBJECT_HANDLE)
private_key_ptr = new_ptr(CK_OBJECT_HANDLE)
mechanism_ptr = new_ptr(CK_MECHANISM,
(CKM_RSA_PKCS_KEY_PAIR_GEN, NULL_PTR, 0))
if self._id_exists(id_, id_length, CKO_PRIVATE_KEY):
raise DuplicationError("Private key with same ID already exists")
if self._id_exists(id_, id_length, CKO_PUBLIC_KEY):
raise DuplicationError("Public key with same ID already exists")
modulus_bits_ptr = new_ptr(CK_ULONG, modulus_bits)
# Process keyword boolean arguments
(_pub_cka_copyable_ptr, pub_cka_derive_ptr, pub_cka_encrypt_ptr,
pub_cka_modifiable_ptr, pub_cka_private_ptr, pub_cka_trusted_ptr,
pub_cka_verify_ptr, pub_cka_verify_recover_ptr, pub_cka_wrap_ptr,
) = convert_py2bool(attrs_pub)
(priv_cka_always_authenticate_ptr, _priv_cka_copyable_ptr,
priv_cka_decrypt_ptr, priv_cka_derive_ptr, priv_cka_extractable_ptr,
priv_cka_modifiable_ptr, priv_cka_private_ptr, priv_cka_sensitive_ptr,
priv_cka_sign_ptr, _priv_cka_sign_recover_ptr, priv_cka_unwrap_ptr,
priv_cka_wrap_with_trusted_ptr,) = convert_py2bool(attrs_priv)
# 65537 (RFC 6376 section 3.3.1)
public_exponent = new_array(CK_BYTE, (1, 0, 1))
publicKeyTemplate = new_array(CK_ATTRIBUTE, (
(CKA_ID, id_, id_length),
(CKA_LABEL, label, label_length),
(CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)),
(CKA_MODULUS_BITS, modulus_bits_ptr, sizeof(CK_ULONG)),
(CKA_PUBLIC_EXPONENT, public_exponent, 3),
# TODO Softhsm doesn't support it
# (CKA_COPYABLE, pub_cka_copyable_p, sizeof(CK_BBOOL)),
(CKA_DERIVE, pub_cka_derive_ptr, sizeof(CK_BBOOL)),
(CKA_ENCRYPT, pub_cka_encrypt_ptr, sizeof(CK_BBOOL)),
(CKA_MODIFIABLE, pub_cka_modifiable_ptr, sizeof(CK_BBOOL)),
(CKA_PRIVATE, pub_cka_private_ptr, sizeof(CK_BBOOL)),
(CKA_TRUSTED, pub_cka_trusted_ptr, sizeof(CK_BBOOL)),
(CKA_VERIFY, pub_cka_verify_ptr, sizeof(CK_BBOOL)),
(CKA_VERIFY_RECOVER, pub_cka_verify_recover_ptr, sizeof(CK_BBOOL)),
(CKA_WRAP, pub_cka_wrap_ptr, sizeof(CK_BBOOL)),
))
privateKeyTemplate = new_array(CK_ATTRIBUTE, (
(CKA_ID, id_, id_length),
(CKA_LABEL, label, label_length),
(CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)),
(CKA_ALWAYS_AUTHENTICATE, priv_cka_always_authenticate_ptr,
sizeof(CK_BBOOL)),
# TODO Softhsm doesn't support it
# (CKA_COPYABLE, priv_cka_copyable_ptr, sizeof(CK_BBOOL)),
(CKA_DECRYPT, priv_cka_decrypt_ptr, sizeof(CK_BBOOL)),
(CKA_DERIVE, priv_cka_derive_ptr, sizeof(CK_BBOOL)),
(CKA_EXTRACTABLE, priv_cka_extractable_ptr, sizeof(CK_BBOOL)),
(CKA_MODIFIABLE, priv_cka_modifiable_ptr, sizeof(CK_BBOOL)),
(CKA_PRIVATE, priv_cka_private_ptr, sizeof(CK_BBOOL)),
(CKA_SENSITIVE, priv_cka_sensitive_ptr, sizeof(CK_BBOOL)),
(CKA_SIGN, priv_cka_sign_ptr, sizeof(CK_BBOOL)),
(CKA_SIGN_RECOVER, priv_cka_sign_ptr, sizeof(CK_BBOOL)),
(CKA_UNWRAP, priv_cka_unwrap_ptr, sizeof(CK_BBOOL)),
(CKA_WRAP_WITH_TRUSTED, priv_cka_wrap_with_trusted_ptr,
sizeof(CK_BBOOL)),
))
rv = self.p11.C_GenerateKeyPair(self.session, mechanism_ptr,
publicKeyTemplate,
(sizeof(publicKeyTemplate) //
sizeof(CK_ATTRIBUTE)),
privateKeyTemplate,
(sizeof(privateKeyTemplate) //
sizeof(CK_ATTRIBUTE)),
public_key_ptr,
private_key_ptr)
check_return_value(rv, "generate key pair")
return public_key_ptr[0], private_key_ptr[0]
def find_keys(self, objclass=CKO_VENDOR_DEFINED, label=None, id=None,
cka_wrap=None, cka_unwrap=None, uri=None):
"""
Find key
"""
if isinstance(id, unicode):
id = id.encode()
if isinstance(uri, unicode):
uri = uri.encode()
class_ = objclass
class_ptr = new_ptr(CK_OBJECT_CLASS, class_)
ckawrap = NULL
ckaunwrap = NULL
if id is not None:
id_ = new_array(CK_BYTE, id)
id_length = len(id)
else:
id_ = NULL
id_length = 0
label_unicode, label = label, NULL
cka_wrap_bool = cka_wrap
cka_unwrap_bool = cka_unwrap
label_length = 0
uri_str = uri
uri = NULL
template = new_array(CK_ATTRIBUTE, MAX_TEMPLATE_LEN)
template_len_ptr = new_ptr(CK_ULONG, MAX_TEMPLATE_LEN)
# TODO check long overflow
if label_unicode is not None:
label, label_length = unicode_to_char_array(label_unicode)
if cka_wrap_bool is not None:
if cka_wrap_bool:
ckawrap = true_ptr
else:
ckawrap = false_ptr
if cka_unwrap_bool is not None:
if cka_unwrap_bool:
ckaunwrap = true_ptr
else:
ckaunwrap = false_ptr
if class_ == CKO_VENDOR_DEFINED:
class_ptr = NULL
try:
if uri_str is None:
_fill_template_from_parts(template, template_len_ptr, id_,
id_length, label, label_length,
class_ptr, ckawrap, ckaunwrap)
else:
uri = _parse_uri(uri_str)
template = (p11_kit_uri_get_attributes(uri, template_len_ptr))
# Do not deallocate URI while you are using the template.
# Template contains pointers to values inside URI!
result_list = self._find_key(template, template_len_ptr[0])
return result_list
finally:
if uri != NULL:
p11_kit_uri_free(uri)
def delete_key(self, key_handle):
"""
delete key
"""
# TODO check long overflow
rv = self.p11.C_DestroyObject(self.session, key_handle)
check_return_value(rv, "object deletion")
def _export_RSA_public_key(self, object):
"""
export RSA public key
"""
class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PUBLIC_KEY)
key_type_ptr = new_ptr(CK_KEY_TYPE, CKK_RSA)
obj_template = new_array(CK_ATTRIBUTE, (
(CKA_MODULUS, NULL_PTR, 0),
(CKA_PUBLIC_EXPONENT, NULL_PTR, 0),
(CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)),
(CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)),
))
rv = self.p11.C_GetAttributeValue(self.session, object, obj_template,
(sizeof(obj_template) //
sizeof(CK_ATTRIBUTE)))
check_return_value(rv, "get RSA public key values - prepare")
# Set proper size for attributes
modulus = new_array(CK_BYTE,
obj_template[0].ulValueLen * sizeof(CK_BYTE))
obj_template[0].pValue = modulus
exponent = new_array(CK_BYTE,
obj_template[1].ulValueLen * sizeof(CK_BYTE))
obj_template[1].pValue = exponent
rv = self.p11.C_GetAttributeValue(self.session, object, obj_template,
(sizeof(obj_template) //
sizeof(CK_ATTRIBUTE)))
check_return_value(rv, "get RSA public key values")
# Check if the key is RSA public key
if class_ptr[0] != CKO_PUBLIC_KEY:
raise Error("export_RSA_public_key: required public key class")
if key_type_ptr[0] != CKK_RSA:
raise Error("export_RSA_public_key: required RSA key type")
try:
n = bytes_to_int(string_to_pybytes_or_none(
modulus, obj_template[0].ulValueLen))
except Exception:
raise Error("export_RSA_public_key: internal error: unable to "
"convert modulus")
try:
e = bytes_to_int(string_to_pybytes_or_none(
exponent, obj_template[1].ulValueLen))
except Exception:
raise Error("export_RSA_public_key: internal error: unable to "
"convert exponent")
# set modulus and exponent
rsa_ = rsa.RSAPublicNumbers(e, n)
try:
pkey = rsa_.public_key(default_backend())
except Exception:
raise Error("export_RSA_public_key: internal error: "
"EVP_PKEY_set1_RSA failed")
try:
ret = pkey.public_bytes(
format=serialization.PublicFormat.SubjectPublicKeyInfo,
encoding=serialization.Encoding.DER,
)
except Exception:
ret = None
return ret
def export_public_key(self, key_handle):
"""
Export public key
Export public key in SubjectPublicKeyInfo (RFC5280) DER encoded format
"""
object = key_handle
class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PUBLIC_KEY)
key_type_ptr = new_ptr(CK_KEY_TYPE, CKK_RSA)
# TODO check long overflow
obj_template = new_array(CK_ATTRIBUTE, (
(CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)),
(CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)),
))
rv = self.p11.C_GetAttributeValue(self.session, object, obj_template,
(sizeof(obj_template) //
sizeof(CK_ATTRIBUTE)))
check_return_value(rv, "export_public_key: get RSA public key values")
if class_ptr[0] != CKO_PUBLIC_KEY:
raise Error("export_public_key: required public key class")
if key_type_ptr[0] == CKK_RSA:
return self._export_RSA_public_key(object)
else:
raise Error("export_public_key: unsupported key type")
def _import_RSA_public_key(self, label, label_length, id, id_length, pkey,
cka_copyable, cka_derive, cka_encrypt,
cka_modifiable, cka_private, cka_trusted,
cka_verify, cka_verify_recover, cka_wrap):
"""
Import RSA public key
"""
class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PUBLIC_KEY)
keyType_ptr = new_ptr(CK_KEY_TYPE, CKK_RSA)
cka_token = true_ptr
if not isinstance(pkey, rsa.RSAPublicKey):
raise Error("Required RSA public key")
rsa_ = pkey.public_numbers()
# convert BIGNUM to binary array
modulus = new_array(CK_BYTE, int_to_bytes(rsa_.n))
modulus_len = sizeof(modulus) - 1
if modulus_len == 0:
raise Error("import_RSA_public_key: BN_bn2bin modulus error")
exponent = new_array(CK_BYTE, int_to_bytes(rsa_.e))
exponent_len = sizeof(exponent) - 1
if exponent_len == 0:
raise Error("import_RSA_public_key: BN_bn2bin exponent error")
template = new_array(CK_ATTRIBUTE, (
(CKA_ID, id, id_length),
(CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)),
(CKA_KEY_TYPE, keyType_ptr, sizeof(CK_KEY_TYPE)),
(CKA_TOKEN, cka_token, sizeof(CK_BBOOL)),
(CKA_LABEL, label, label_length),
(CKA_MODULUS, modulus, modulus_len),
(CKA_PUBLIC_EXPONENT, exponent, exponent_len),
# TODO Softhsm doesn't support it
# (CKA_COPYABLE, cka_copyable, sizeof(CK_BBOOL)),
(CKA_DERIVE, cka_derive, sizeof(CK_BBOOL)),
(CKA_ENCRYPT, cka_encrypt, sizeof(CK_BBOOL)),
(CKA_MODIFIABLE, cka_modifiable, sizeof(CK_BBOOL)),
(CKA_PRIVATE, cka_private, sizeof(CK_BBOOL)),
(CKA_TRUSTED, cka_trusted, sizeof(CK_BBOOL)),
(CKA_VERIFY, cka_verify, sizeof(CK_BBOOL)),
(CKA_VERIFY_RECOVER, cka_verify_recover, sizeof(CK_BBOOL)),
(CKA_WRAP, cka_wrap, sizeof(CK_BBOOL)),
))
object_ptr = new_ptr(CK_OBJECT_HANDLE)
rv = self.p11.C_CreateObject(self.session, template,
(sizeof(template) //
sizeof(CK_ATTRIBUTE)), object_ptr)
check_return_value(rv, "create public key object")
return object_ptr[0]
def import_public_key(self, label, id, data, cka_copyable=True,
cka_derive=False, cka_encrypt=False,
cka_modifiable=True, cka_private=True,
cka_trusted=False, cka_verify=True,
cka_verify_recover=True, cka_wrap=False):
"""
Import RSA public key
"""
if isinstance(id, unicode):
id = id.encode()
if isinstance(data, unicode):
data = data.encode()
label_unicode = label
id_ = new_array(CK_BYTE, id)
id_length = len(id)
attrs_pub = (
cka_copyable,
cka_derive,
cka_encrypt,
cka_modifiable,
cka_private,
cka_trusted,
cka_verify,
cka_verify_recover,
cka_wrap,
)
label, label_length = unicode_to_char_array(label_unicode)
if self._id_exists(id_, id_length, CKO_PUBLIC_KEY):
raise DuplicationError("Public key with same ID already exists")
# Process keyword boolean arguments
(cka_copyable_ptr, cka_derive_ptr, cka_encrypt_ptr, cka_modifiable_ptr,
cka_private_ptr, cka_trusted_ptr, cka_verify_ptr,
cka_verify_recover_ptr, cka_wrap_ptr,) = convert_py2bool(attrs_pub)
# decode from ASN1 DER
try:
pkey = serialization.load_der_public_key(data, default_backend())
except Exception:
raise Error("import_public_key: d2i_PUBKEY error")
if isinstance(pkey, rsa.RSAPublicKey):
ret = self._import_RSA_public_key(label, label_length, id_,
id_length, pkey,
cka_copyable_ptr,
cka_derive_ptr,
cka_encrypt_ptr,
cka_modifiable_ptr,
cka_private_ptr,
cka_trusted_ptr,
cka_verify_ptr,
cka_verify_recover_ptr,
cka_wrap_ptr)
elif isinstance(pkey, dsa.DSAPublicKey):
raise Error("DSA is not supported")
elif isinstance(pkey, ec.EllipticCurvePublicKey):
raise Error("EC is not supported")
else:
raise Error("Unsupported key type")
return ret
def export_wrapped_key(self, key, wrapping_key, wrapping_mech):
"""
Export wrapped key
"""
object_key = key
object_wrapping_key = wrapping_key
wrapped_key_len_ptr = new_ptr(CK_ULONG, 0)
wrapping_mech_ptr = new_ptr(CK_MECHANISM, (wrapping_mech, NULL, 0))
# currently we don't support parameter in mechanism
# TODO check long overflow
# TODO export method
# fill mech parameters
_set_wrapping_mech_parameters(wrapping_mech_ptr.mechanism,
wrapping_mech_ptr)
rv = self.p11.C_WrapKey(self.session, wrapping_mech_ptr,
object_wrapping_key, object_key, NULL,
wrapped_key_len_ptr)
check_return_value(rv, "key wrapping: get buffer length")
wrapped_key = new_array(CK_BYTE, wrapped_key_len_ptr[0])
rv = self.p11.C_WrapKey(self.session, wrapping_mech_ptr,
object_wrapping_key, object_key, wrapped_key,
wrapped_key_len_ptr)
check_return_value(rv, "key wrapping: wrapping")
result = string_to_pybytes_or_none(wrapped_key, wrapped_key_len_ptr[0])
return result
def import_wrapped_secret_key(self, label, id, data, unwrapping_key,
wrapping_mech, key_type, cka_copyable=True,
cka_decrypt=False, cka_derive=False,
cka_encrypt=False, cka_extractable=True,
cka_modifiable=True, cka_private=True,
cka_sensitive=True, cka_sign=False,
cka_unwrap=True, cka_verify=False,
cka_wrap=True, cka_wrap_with_trusted=False):
"""
Import wrapped secret key
"""
if isinstance(id, unicode):
id = id.encode()
if isinstance(data, unicode):
data = data.encode()
wrapped_key = new_array(CK_BYTE, data)
wrapped_key_len = len(data)
unwrapping_key_object = unwrapping_key
unwrapped_key_object_ptr = new_ptr(CK_OBJECT_HANDLE, 0)
label_unicode = label
id_ = new_array(CK_BYTE, id)
id_length = len(id)
wrapping_mech_ptr = new_ptr(CK_MECHANISM, (wrapping_mech, NULL, 0))
key_class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_SECRET_KEY)
key_type_ptr = new_ptr(CK_KEY_TYPE, key_type)
attrs = (
cka_copyable,
cka_decrypt,
cka_derive,
cka_encrypt,
cka_extractable,
cka_modifiable,
cka_private,
cka_sensitive,
cka_sign,
cka_unwrap,
cka_verify,
cka_wrap,
cka_wrap_with_trusted,
)
_set_wrapping_mech_parameters(wrapping_mech_ptr.mechanism,
wrapping_mech_ptr)
label, label_length = unicode_to_char_array(label_unicode)
if self._id_exists(id_, id_length, key_class_ptr[0]):
raise DuplicationError("Secret key with same ID already exists")
# Process keyword boolean arguments
(_cka_copyable_ptr, cka_decrypt_ptr, cka_derive_ptr, cka_encrypt_ptr,
cka_extractable_ptr, cka_modifiable_ptr, cka_private_ptr,
cka_sensitive_ptr, cka_sign_ptr, cka_unwrap_ptr, cka_verify_ptr,
cka_wrap_ptr, cka_wrap_with_trusted_ptr,) = convert_py2bool(attrs)
template = new_array(CK_ATTRIBUTE, (
(CKA_CLASS, key_class_ptr, sizeof(CK_OBJECT_CLASS)),
(CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)),
(CKA_ID, id_, id_length),
(CKA_LABEL, label, label_length),
(CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)),
# TODO Softhsm doesn't support it
# (CKA_COPYABLE, cka_copyable_ptr, sizeof(CK_BBOOL)),
(CKA_DECRYPT, cka_decrypt_ptr, sizeof(CK_BBOOL)),
(CKA_DERIVE, cka_derive_ptr, sizeof(CK_BBOOL)),
(CKA_ENCRYPT, cka_encrypt_ptr, sizeof(CK_BBOOL)),
(CKA_EXTRACTABLE, cka_extractable_ptr, sizeof(CK_BBOOL)),
(CKA_MODIFIABLE, cka_modifiable_ptr, sizeof(CK_BBOOL)),
(CKA_PRIVATE, cka_private_ptr, sizeof(CK_BBOOL)),
(CKA_SENSITIVE, cka_sensitive_ptr, sizeof(CK_BBOOL)),
(CKA_SIGN, cka_sign_ptr, sizeof(CK_BBOOL)),
(CKA_UNWRAP, cka_unwrap_ptr, sizeof(CK_BBOOL)),
(CKA_VERIFY, cka_verify_ptr, sizeof(CK_BBOOL)),
(CKA_WRAP, cka_wrap_ptr, sizeof(CK_BBOOL)),
(CKA_WRAP_WITH_TRUSTED, cka_wrap_with_trusted_ptr,
sizeof(CK_BBOOL)),
))
rv = self.p11.C_UnwrapKey(self.session, wrapping_mech_ptr,
unwrapping_key_object, wrapped_key,
wrapped_key_len, template,
sizeof(template) // sizeof(CK_ATTRIBUTE),
unwrapped_key_object_ptr)
check_return_value(rv, "import_wrapped_key: key unwrapping")
return unwrapped_key_object_ptr[0]
def import_wrapped_private_key(self, label, id, data, unwrapping_key,
wrapping_mech, key_type,
cka_always_authenticate=False,
cka_copyable=True, cka_decrypt=False,
cka_derive=False, cka_extractable=True,
cka_modifiable=True, cka_private=True,
cka_sensitive=True, cka_sign=True,
cka_sign_recover=True, cka_unwrap=False,
cka_wrap_with_trusted=False):
"""
Import wrapped private key
"""
if isinstance(id, unicode):
id = id.encode()
if isinstance(data, unicode):
data = data.encode()
wrapped_key = new_array(CK_BYTE, data)
wrapped_key_len = len(data)
unwrapping_key_object = unwrapping_key
unwrapped_key_object_ptr = new_ptr(CK_OBJECT_HANDLE, 0)
label_unicode = label
id_ = new_array(CK_BYTE, id)
id_length = len(id)
wrapping_mech_ptr = new_ptr(CK_MECHANISM, (wrapping_mech, NULL, 0))
key_class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PRIVATE_KEY)
key_type_ptr = new_ptr(CK_KEY_TYPE, key_type)
attrs_priv = (
cka_always_authenticate,
cka_copyable,
cka_decrypt,
cka_derive,
cka_extractable,
cka_modifiable,
cka_private,
cka_sensitive,
cka_sign,
cka_sign_recover,
cka_unwrap,
cka_wrap_with_trusted,
)
label, label_length = unicode_to_char_array(label_unicode)
if self._id_exists(id_, id_length, CKO_SECRET_KEY):
raise DuplicationError("Secret key with same ID already exists")
# Process keyword boolean arguments
(cka_always_authenticate_ptr, _cka_copyable_ptr, cka_decrypt_ptr,
cka_derive_ptr, cka_extractable_ptr, cka_modifiable_ptr,
cka_private_ptr, cka_sensitive_ptr, cka_sign_ptr,
_cka_sign_recover_ptr, cka_unwrap_ptr, cka_wrap_with_trusted_ptr,
) = convert_py2bool(attrs_priv)
template = new_array(CK_ATTRIBUTE, (
(CKA_CLASS, key_class_ptr, sizeof(CK_OBJECT_CLASS)),
(CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)),
(CKA_ID, id_, id_length),
(CKA_LABEL, label, label_length),
(CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)),
(CKA_ALWAYS_AUTHENTICATE, cka_always_authenticate_ptr,
sizeof(CK_BBOOL)),
# TODO Softhsm doesn't support it
# (CKA_COPYABLE, cka_copyable_ptr, sizeof(CK_BBOOL)),
(CKA_DECRYPT, cka_decrypt_ptr, sizeof(CK_BBOOL)),
(CKA_DERIVE, cka_derive_ptr, sizeof(CK_BBOOL)),
(CKA_EXTRACTABLE, cka_extractable_ptr, sizeof(CK_BBOOL)),
(CKA_MODIFIABLE, cka_modifiable_ptr, sizeof(CK_BBOOL)),
(CKA_PRIVATE, cka_private_ptr, sizeof(CK_BBOOL)),
(CKA_SENSITIVE, cka_sensitive_ptr, sizeof(CK_BBOOL)),
(CKA_SIGN, cka_sign_ptr, sizeof(CK_BBOOL)),
(CKA_SIGN_RECOVER, cka_sign_ptr, sizeof(CK_BBOOL)),
(CKA_UNWRAP, cka_unwrap_ptr, sizeof(CK_BBOOL)),
(CKA_WRAP_WITH_TRUSTED, cka_wrap_with_trusted_ptr,
sizeof(CK_BBOOL)),
))
rv = self.p11.C_UnwrapKey(self.session, wrapping_mech_ptr,
unwrapping_key_object, wrapped_key,
wrapped_key_len, template,
sizeof(template) // sizeof(CK_ATTRIBUTE),
unwrapped_key_object_ptr)
check_return_value(rv, "import_wrapped_key: key unwrapping")
return unwrapped_key_object_ptr[0]
def set_attribute(self, key_object, attr, value):
"""
Set object attributes
"""
object = key_object
attribute_ptr = new_ptr(CK_ATTRIBUTE)
attribute_ptr.type = attr
if attr in (CKA_ALWAYS_AUTHENTICATE,
CKA_ALWAYS_SENSITIVE,
CKA_COPYABLE,
CKA_ENCRYPT,
CKA_EXTRACTABLE,
CKA_DECRYPT,
CKA_DERIVE,
CKA_LOCAL,
CKA_MODIFIABLE,
CKA_NEVER_EXTRACTABLE,
CKA_PRIVATE,
CKA_SENSITIVE,
CKA_SIGN,
CKA_SIGN_RECOVER,
CKA_TOKEN,
CKA_TRUSTED,
CKA_UNWRAP,
CKA_VERIFY,
CKA_VERIFY_RECOVER,
CKA_WRAP,
CKA_WRAP_WITH_TRUSTED):
attribute_ptr.pValue = true_ptr if value else false_ptr
attribute_ptr.ulValueLen = sizeof(CK_BBOOL)
elif attr == CKA_ID:
if not isinstance(value, bytes):
raise Error("Bytestring value expected")
attribute_ptr.pValue = new_array(CK_BYTE, value)
attribute_ptr.ulValueLen = len(value)
elif attr == CKA_LABEL:
if not isinstance(value, unicode):
raise Error("Unicode value expected")
label, label_length = unicode_to_char_array(value)
attribute_ptr.pValue = label
attribute_ptr.ulValueLen = label_length
elif attr == CKA_KEY_TYPE:
if not isinstance(value, int):
raise Error("Integer value expected")
attribute_ptr.pValue = new_ptr(unsigned_long, value)
attribute_ptr.ulValueLen = sizeof(unsigned_long)
else:
raise Error("Unknown attribute")
template = new_array(CK_ATTRIBUTE, (attribute_ptr[0],))
rv = self.p11.C_SetAttributeValue(self.session, object, template,
(sizeof(template) //
sizeof(CK_ATTRIBUTE)))
check_return_value(rv, "set_attribute")
def get_attribute(self, key_object, attr):
object = key_object
attribute_ptr = new_ptr(CK_ATTRIBUTE)
attribute_ptr.type = attr
attribute_ptr.pValue = NULL_PTR
attribute_ptr.ulValueLen = 0
template = new_array(CK_ATTRIBUTE, (attribute_ptr[0],))
rv = self.p11.C_GetAttributeValue(self.session, object, template,
(sizeof(template) //
sizeof(CK_ATTRIBUTE)))
if rv == CKR_ATTRIBUTE_TYPE_INVALID or template[0].ulValueLen == -1:
raise NotFound("attribute does not exist")
check_return_value(rv, "get_attribute init")
value = new_array(unsigned_char, template[0].ulValueLen)
template[0].pValue = value
rv = self.p11.C_GetAttributeValue(self.session, object, template,
(sizeof(template) //
sizeof(CK_ATTRIBUTE)))
check_return_value(rv, "get_attribute")
if attr in (CKA_ALWAYS_AUTHENTICATE,
CKA_ALWAYS_SENSITIVE,
CKA_COPYABLE,
CKA_ENCRYPT,
CKA_EXTRACTABLE,
CKA_DECRYPT,
CKA_DERIVE,
CKA_LOCAL,
CKA_MODIFIABLE,
CKA_NEVER_EXTRACTABLE,
CKA_PRIVATE,
CKA_SENSITIVE,
CKA_SIGN,
CKA_SIGN_RECOVER,
CKA_TOKEN,
CKA_TRUSTED,
CKA_UNWRAP,
CKA_VERIFY,
CKA_VERIFY_RECOVER,
CKA_WRAP,
CKA_WRAP_WITH_TRUSTED):
ret = bool(_ffi.cast(_ffi.getctype(CK_BBOOL, '*'), value)[0])
elif attr == CKA_LABEL:
ret = char_array_to_unicode(value, template[0].ulValueLen)
elif attr in (CKA_MODULUS, CKA_PUBLIC_EXPONENT, CKA_ID):
ret = string_to_pybytes_or_none(value, template[0].ulValueLen)
elif attr == CKA_KEY_TYPE:
ret = _ffi.cast(_ffi.getctype(unsigned_long, '*'), value)[0]
else:
raise Error("Unknown attribute")
return ret
# Key Classes
KEY_CLASS_PUBLIC_KEY = CKO_PUBLIC_KEY
KEY_CLASS_PRIVATE_KEY = CKO_PRIVATE_KEY
KEY_CLASS_SECRET_KEY = CKO_SECRET_KEY
# Key types
KEY_TYPE_RSA = CKK_RSA
KEY_TYPE_AES = CKK_AES
# Wrapping mech type
MECH_RSA_PKCS = CKM_RSA_PKCS
MECH_RSA_PKCS_OAEP = CKM_RSA_PKCS_OAEP
MECH_AES_KEY_WRAP = CKM_AES_KEY_WRAP
MECH_AES_KEY_WRAP_PAD = CKM_AES_KEY_WRAP_PAD
def gen_key_id(key_id_len=16):
"""
Generate random softhsm KEY_ID
:param key_id_len: this should be 16
:return: random softhsm KEY_ID in bytes representation
"""
return struct.pack(
"B" * key_id_len, # key_id must be bytes
*(random.randint(0, 255) for _ in range(key_id_len))
)
def generate_master_key(p11, keylabel=u"dnssec-master", key_length=16,
disable_old_keys=True):
assert isinstance(p11, P11_Helper)
key_id = None
while True:
# check if key with this ID exist in LDAP or softHSM
# id is 16 Bytes long
key_id = gen_key_id()
keys = p11.find_keys(KEY_CLASS_SECRET_KEY,
label=keylabel,
id=key_id)
if not keys:
break # we found unique id
p11.generate_master_key(keylabel,
key_id,
key_length=key_length,
cka_wrap=True,
cka_unwrap=True)
if disable_old_keys:
# set CKA_WRAP=False for old master keys
master_keys = p11.find_keys(KEY_CLASS_SECRET_KEY,
label=keylabel,
cka_wrap=True)
for handle in master_keys:
# don't disable wrapping for new key
# compare IDs not handle
if key_id != p11.get_attribute(handle, CKA_ID):
p11.set_attribute(handle, CKA_WRAP, False)