import os
import base64
import json
import mimetypes
import re
import sys
import time
import tempfile
import sysconfig

from collections import OrderedDict
from contextlib import contextmanager
from http.cookiejar import parse_ns_headers
from pathlib import Path
from pprint import pformat
from urllib.parse import urlsplit
from typing import Any, List, Optional, Tuple, Generator, Callable, Iterable, IO, TypeVar

import requests.auth

RE_COOKIE_SPLIT = re.compile(r', (?=[^ ;]+=)')
Item = Tuple[str, Any]
Items = List[Item]
T = TypeVar("T")


class JsonDictPreservingDuplicateKeys(OrderedDict):
    """A specialized JSON dict preserving duplicate keys."""

    # Python versions prior to 3.8 suffer from an issue with multiple keys with the same name.
    # `json.dumps(obj, indent=N, sort_keys=True)` will output sorted keys when they are unique, and
    # duplicate keys will be outputted as they were defined in the original data.
    # See <https://bugs.python.org/issue23493#msg400929> for the behavior change between Python versions.
    SUPPORTS_SORTING = sys.version_info >= (3, 8)

    def __init__(self, items: Items):
        self._items = items
        self._ensure_items_used()

    def _ensure_items_used(self) -> None:
        """HACK: Force `json.dumps()` to use `self.items()` instead of an empty dict.

        Two JSON encoders are available on CPython: pure-Python (1) and C (2) implementations.

        (1) The pure-python implementation will do a simple `if not dict: return '{}'`,
        and we could fake that check by implementing the `__bool__()` method.
        Source:
            - <https://github.com/python/cpython/blob/9d318ad/Lib/json/encoder.py#L334-L336>

        (2) On the other hand, the C implementation will do a check on the number of
        items contained inside the dict, using a verification on `dict->ma_used`, which
        is updated only when an item is added/removed from the dict. For that case,
        there is no workaround but to add an item into the dict.
        Sources:
            - <https://github.com/python/cpython/blob/9d318ad/Modules/_json.c#L1581-L1582>
            - <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L53>
            - <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L17-L18>

        To please both implementations, we simply add one item to the dict.

        """
        if self._items:
            self['__hack__'] = '__hack__'

    def items(self) -> Items:
        """Return all items, duplicate ones included.

        """
        return self._items


def load_json_preserve_order_and_dupe_keys(s):
    return json.loads(s, object_pairs_hook=JsonDictPreservingDuplicateKeys)


def repr_dict(d: dict) -> str:
    return pformat(d)


def humanize_bytes(n, precision=2):
    # Author: Doug Latornell
    # Licence: MIT
    # URL: https://code.activestate.com/recipes/577081/
    """Return a humanized string representation of a number of bytes.

    >>> humanize_bytes(1)
    '1 B'
    >>> humanize_bytes(1024, precision=1)
    '1.0 kB'
    >>> humanize_bytes(1024 * 123, precision=1)
    '123.0 kB'
    >>> humanize_bytes(1024 * 12342, precision=1)
    '12.1 MB'
    >>> humanize_bytes(1024 * 12342, precision=2)
    '12.05 MB'
    >>> humanize_bytes(1024 * 1234, precision=2)
    '1.21 MB'
    >>> humanize_bytes(1024 * 1234 * 1111, precision=2)
    '1.31 GB'
    >>> humanize_bytes(1024 * 1234 * 1111, precision=1)
    '1.3 GB'

    """
    abbrevs = [
        (1 << 50, 'PB'),
        (1 << 40, 'TB'),
        (1 << 30, 'GB'),
        (1 << 20, 'MB'),
        (1 << 10, 'kB'),
        (1, 'B')
    ]

    if n == 1:
        return '1 B'

    for factor, suffix in abbrevs:
        if n >= factor:
            break

    # noinspection PyUnboundLocalVariable
    return f'{n / factor:.{precision}f} {suffix}'


class ExplicitNullAuth(requests.auth.AuthBase):
    """Forces requests to ignore the ``.netrc``.
    <https://github.com/psf/requests/issues/2773#issuecomment-174312831>
    """

    def __call__(self, r):
        return r


def get_content_type(filename):
    """
    Return the content type for ``filename`` in format appropriate
    for Content-Type headers, or ``None`` if the file type is unknown
    to ``mimetypes``.

    """
    return mimetypes.guess_type(filename, strict=False)[0]


def split_cookies(cookies):
    """
    When ``requests`` stores cookies in ``response.headers['Set-Cookie']``
    it concatenates all of them through ``, ``.

    This function splits cookies apart being careful to not to
    split on ``, `` which may be part of cookie value.
    """
    if not cookies:
        return []
    return RE_COOKIE_SPLIT.split(cookies)


def get_expired_cookies(
    cookies: str,
    now: float = None
) -> List[dict]:

    now = now or time.time()

    def is_expired(expires: Optional[float]) -> bool:
        return expires is not None and expires <= now

    attr_sets: List[Tuple[str, str]] = parse_ns_headers(
        split_cookies(cookies)
    )

    cookies = [
        # The first attr name is the cookie name.
        dict(attrs[1:], name=attrs[0][0])
        for attrs in attr_sets
    ]

    _max_age_to_expires(cookies=cookies, now=now)

    return [
        {
            'name': cookie['name'],
            'path': cookie.get('path', '/')
        }
        for cookie in cookies
        if is_expired(expires=cookie.get('expires'))
    ]


def _max_age_to_expires(cookies, now):
    """
    Translate `max-age` into `expires` for Requests to take it into account.

    HACK/FIXME: <https://github.com/psf/requests/issues/5743>

    """
    for cookie in cookies:
        if 'expires' in cookie:
            continue
        max_age = cookie.get('max-age')
        if max_age and max_age.isdigit():
            cookie['expires'] = now + float(max_age)


def parse_content_type_header(header):
    """Borrowed from requests."""
    tokens = header.split(';')
    content_type, params = tokens[0].strip(), tokens[1:]
    params_dict = {}
    items_to_strip = "\"' "
    for param in params:
        param = param.strip()
        if param:
            key, value = param, True
            index_of_equals = param.find("=")
            if index_of_equals != -1:
                key = param[:index_of_equals].strip(items_to_strip)
                value = param[index_of_equals + 1:].strip(items_to_strip)
            params_dict[key.lower()] = value
    return content_type, params_dict


def as_site(path: Path, **extra_vars) -> Path:
    site_packages_path = sysconfig.get_path(
        'purelib',
        vars={'base': str(path), **extra_vars}
    )
    return Path(site_packages_path)


def get_site_paths(path: Path) -> Iterable[Path]:
    from httpie.compat import (
        MIN_SUPPORTED_PY_VERSION,
        MAX_SUPPORTED_PY_VERSION,
        is_frozen
    )

    if is_frozen:
        [major, min_minor] = MIN_SUPPORTED_PY_VERSION
        [major, max_minor] = MAX_SUPPORTED_PY_VERSION
        for minor in range(min_minor, max_minor + 1):
            yield as_site(
                path,
                py_version_short=f'{major}.{minor}'
            )
    else:
        yield as_site(path)


def split(iterable: Iterable[T], key: Callable[[T], bool]) -> Tuple[List[T], List[T]]:
    left, right = [], []
    for item in iterable:
        if key(item):
            left.append(item)
        else:
            right.append(item)
    return left, right


def unwrap_context(exc: Exception) -> Optional[Exception]:
    context = exc.__context__
    if isinstance(context, Exception):
        return unwrap_context(context)
    else:
        return exc


def url_as_host(url: str) -> str:
    return urlsplit(url).netloc.split('@')[-1]


class LockFileError(ValueError):
    pass


@contextmanager
def open_with_lockfile(file: Path, *args, **kwargs) -> Generator[IO[Any], None, None]:
    file_id = base64.b64encode(os.fsencode(file)).decode()
    target_file = Path(tempfile.gettempdir()) / file_id

    # Have an atomic-like touch here, so we'll tighten the possibility of
    # a race occuring between multiple processes accessing the same file.
    try:
        target_file.touch(exist_ok=False)
    except FileExistsError as exc:
        raise LockFileError("Can't modify a locked file.") from exc

    try:
        with open(file, *args, **kwargs) as stream:
            yield stream
    finally:
        target_file.unlink()


def is_version_greater(version_1: str, version_2: str) -> bool:
    # In an ideal scenario, we would depend on `packaging` in order
    # to offer PEP 440 compatible parsing. But since it might not be
    # commonly available for outside packages, and since we are only
    # going to parse HTTPie's own version it should be fine to compare
    # this in a SemVer subset fashion.

    def split_version(version: str) -> Tuple[int, ...]:
        parts = []
        for part in version.split('.')[:3]:
            try:
                parts.append(int(part))
            except ValueError:
                break
        return tuple(parts)

    return split_version(version_1) > split_version(version_2)
