Module certbot_dns_hostker.dns_hostker

DNS Authenticator for Hostker

Expand source code
""" DNS Authenticator for Hostker """

import logging
import zope.interface

from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from ker import Ker
from ker import HostkerRequestError

logger = logging.getLogger(__name__)

ACCOUNT_URL = 'https://www.hostker.com/'


def sub_domain_to_tld(domain):
    """
    Turn the input domain to get Top Level Domain
    Args:
        domain: any domain value
    Returns:
        TLD of input domain
    """
    domain_list = domain.split('.')[-2:]
    return '.'.join(domain_list)


def validate_domain_to_record(domain):
    """
    remove TLD part of validate domain record
    Args:
        domain: validate domain(include _acme-challenge.[input_domain]
    Returns:
        _acme-challenge.[input_domain_without_TLD]
    """
    domain_list = domain.split('.')[:-2]
    if len(domain_list) > 0 and domain_list[-1] == '*':
        domain_list = domain_list[:-1]
    return '.'.join(domain_list)


@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
    """Dns Authenticator for Hostker"""

    description = (
        'Obtain certificates using a DNS TXT record(if you are using Hostker for DNS).')

    ttl = 60

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.credentials = None

    @classmethod
    def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
        super(Authenticator, cls).add_parser_arguments(add)
        add('credentials', help='Hostker credentials INI file.')

    def more_info(self): # pylint: disable=no-self-use
        """output info"""
        return """This plugin configures a DNS TXT record to\
        response to a dns-01 challenge using the Hostker API"""

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            'credentials',
            'Hostker credentials INI file',
            {
                'email': 'email address associated with Hostker account',
                'token': f'API token for Hostker account, obtained from {ACCOUNT_URL}'
            }
        )

    # add txt record
    def _perform(self, domain, validation_name, validation):
        record_name = validate_domain_to_record(validation_name)
        domain = sub_domain_to_tld(domain)
        self._get_hostker_client().add_txt_record(
            domain, record_name, validation, self.ttl)

    # delete txt record
    def _cleanup(self, domain, validation_name, validation):
        record_name = validate_domain_to_record(validation_name)
        domain = sub_domain_to_tld(domain)
        self._get_hostker_client().del_txt_record(domain, record_name)

    def _get_hostker_client(self):
        return HostkerClient(self.credentials.conf(
            'email'), self.credentials.conf('token'))


class HostkerClient:
    """Encapsulates all communication with the Hostker API"""

    def __init__(self, email, token):
        self.ker = Ker(email, token)

    def add_txt_record(self, domain, record_name, record_content, record_ttl):
        """
        Args:
            domain: etc. var.moe
            record_name: etc. sky
            record_content: etc. hello-world
            record_ttl: etc. 300
        """
        try:
            self.ker.dns.add(
                domain=domain,
                header=record_name,
                record_type='TXT',
                data=record_content,
                ttl=record_ttl)
        except HostkerRequestError as err:
            raise errors.PluginError(str(err))
        else:
            logger.debug('Successfully add TXT record')

    def del_txt_record(self, domain, record_name):
        """
        Args:
            domain: etc. var.moe
            record_name: etc. sky
        """
        record_ids = self._get_record_ids(domain, record_name)
        for unique_id in record_ids:
            try:
                self.ker.dns.delete(unique_id)
            except HostkerRequestError as err:
                raise errors.PluginError(str(err))
            else:
                logger.debug('Successfully remove TXT record')

    def _get_record_ids(self, domain, record_name):
        try:
            result = self.ker.dns.list(domain)
        except HostkerRequestError as err:
            raise errors.PluginError(str(err))
        else:
            logger.debug(result)
            records = list(
                filter(
                    lambda record: record['header'] == record_name and record['type'] == 'TXT',
                    result['records']))
            return [record['id'] for record in records]

Functions

def sub_domain_to_tld(domain)

Turn the input domain to get Top Level Domain

Args

domain
any domain value

Returns

TLD of input domain
 
Expand source code
def sub_domain_to_tld(domain):
    """
    Turn the input domain to get Top Level Domain
    Args:
        domain: any domain value
    Returns:
        TLD of input domain
    """
    domain_list = domain.split('.')[-2:]
    return '.'.join(domain_list)
def validate_domain_to_record(domain)

remove TLD part of validate domain record

Args

domain
validate domain(include _acme-challenge.[input_domain]

Returns

_acme-challenge.[input_domain_without_TLD]
 
Expand source code
def validate_domain_to_record(domain):
    """
    remove TLD part of validate domain record
    Args:
        domain: validate domain(include _acme-challenge.[input_domain]
    Returns:
        _acme-challenge.[input_domain_without_TLD]
    """
    domain_list = domain.split('.')[:-2]
    if len(domain_list) > 0 and domain_list[-1] == '*':
        domain_list = domain_list[:-1]
    return '.'.join(domain_list)

Classes

class Authenticator (*args, **kwargs)

Dns Authenticator for Hostker

Expand source code
class Authenticator(dns_common.DNSAuthenticator):
    """Dns Authenticator for Hostker"""

    description = (
        'Obtain certificates using a DNS TXT record(if you are using Hostker for DNS).')

    ttl = 60

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.credentials = None

    @classmethod
    def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
        super(Authenticator, cls).add_parser_arguments(add)
        add('credentials', help='Hostker credentials INI file.')

    def more_info(self): # pylint: disable=no-self-use
        """output info"""
        return """This plugin configures a DNS TXT record to\
        response to a dns-01 challenge using the Hostker API"""

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            'credentials',
            'Hostker credentials INI file',
            {
                'email': 'email address associated with Hostker account',
                'token': f'API token for Hostker account, obtained from {ACCOUNT_URL}'
            }
        )

    # add txt record
    def _perform(self, domain, validation_name, validation):
        record_name = validate_domain_to_record(validation_name)
        domain = sub_domain_to_tld(domain)
        self._get_hostker_client().add_txt_record(
            domain, record_name, validation, self.ttl)

    # delete txt record
    def _cleanup(self, domain, validation_name, validation):
        record_name = validate_domain_to_record(validation_name)
        domain = sub_domain_to_tld(domain)
        self._get_hostker_client().del_txt_record(domain, record_name)

    def _get_hostker_client(self):
        return HostkerClient(self.credentials.conf(
            'email'), self.credentials.conf('token'))

Ancestors

  • certbot.plugins.dns_common.DNSAuthenticator
  • certbot.plugins.common.Plugin

Class variables

var description

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

var ttl

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

>>> int('0b100', base=0)
4

Static methods

def add_parser_arguments(add)

Add plugin arguments to the CLI argument parser.

NOTE: If some of your flags interact with others, you can use cli.report_config_interaction to register this to ensure values are correctly saved/overridable during renewal.

:param callable add: Function that proxies calls to argparse.ArgumentParser.add_argument prepending options with unique plugin name prefix.

Expand source code
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
    super(Authenticator, cls).add_parser_arguments(add)
    add('credentials', help='Hostker credentials INI file.')

Methods

def more_info(self)

output info

Expand source code
def more_info(self): # pylint: disable=no-self-use
    """output info"""
    return """This plugin configures a DNS TXT record to\
    response to a dns-01 challenge using the Hostker API"""
class HostkerClient (email, token)

Encapsulates all communication with the Hostker API

Expand source code
class HostkerClient:
    """Encapsulates all communication with the Hostker API"""

    def __init__(self, email, token):
        self.ker = Ker(email, token)

    def add_txt_record(self, domain, record_name, record_content, record_ttl):
        """
        Args:
            domain: etc. var.moe
            record_name: etc. sky
            record_content: etc. hello-world
            record_ttl: etc. 300
        """
        try:
            self.ker.dns.add(
                domain=domain,
                header=record_name,
                record_type='TXT',
                data=record_content,
                ttl=record_ttl)
        except HostkerRequestError as err:
            raise errors.PluginError(str(err))
        else:
            logger.debug('Successfully add TXT record')

    def del_txt_record(self, domain, record_name):
        """
        Args:
            domain: etc. var.moe
            record_name: etc. sky
        """
        record_ids = self._get_record_ids(domain, record_name)
        for unique_id in record_ids:
            try:
                self.ker.dns.delete(unique_id)
            except HostkerRequestError as err:
                raise errors.PluginError(str(err))
            else:
                logger.debug('Successfully remove TXT record')

    def _get_record_ids(self, domain, record_name):
        try:
            result = self.ker.dns.list(domain)
        except HostkerRequestError as err:
            raise errors.PluginError(str(err))
        else:
            logger.debug(result)
            records = list(
                filter(
                    lambda record: record['header'] == record_name and record['type'] == 'TXT',
                    result['records']))
            return [record['id'] for record in records]

Methods

def add_txt_record(self, domain, record_name, record_content, record_ttl)

Args

domain
etc. var.moe
record_name
etc. sky
record_content
etc. hello-world
record_ttl
etc. 300
Expand source code
def add_txt_record(self, domain, record_name, record_content, record_ttl):
    """
    Args:
        domain: etc. var.moe
        record_name: etc. sky
        record_content: etc. hello-world
        record_ttl: etc. 300
    """
    try:
        self.ker.dns.add(
            domain=domain,
            header=record_name,
            record_type='TXT',
            data=record_content,
            ttl=record_ttl)
    except HostkerRequestError as err:
        raise errors.PluginError(str(err))
    else:
        logger.debug('Successfully add TXT record')
def del_txt_record(self, domain, record_name)

Args

domain
etc. var.moe
record_name
etc. sky
Expand source code
def del_txt_record(self, domain, record_name):
    """
    Args:
        domain: etc. var.moe
        record_name: etc. sky
    """
    record_ids = self._get_record_ids(domain, record_name)
    for unique_id in record_ids:
        try:
            self.ker.dns.delete(unique_id)
        except HostkerRequestError as err:
            raise errors.PluginError(str(err))
        else:
            logger.debug('Successfully remove TXT record')