Skip to content

Commit

Permalink
Update to custom mediatype detection (without Tika requirement)
Browse files Browse the repository at this point in the history
  • Loading branch information
titusz committed Jul 6, 2021
1 parent 6ec4d72 commit 268a07c
Show file tree
Hide file tree
Showing 12 changed files with 667 additions and 372 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ You may also want join our developer chat on Telegram at <https://t.me/iscc_dev>

## Change Log

### [0.9.12] - 2021-07-16
- Update to custom mediatype detection (without Tika requirement)
- Update dependencies

### [0.9.11] - 2020-06-12
- Update dependencies
- Remove support for creating ISCC codes from youtube urls
Expand Down Expand Up @@ -303,5 +307,5 @@ You may also want join our developer chat on Telegram at <https://t.me/iscc_dev>

## License

MIT © 2019-2020 Titusz Pan
MIT © 2019-2021 Titusz Pan

2 changes: 1 addition & 1 deletion iscc_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import iscc_cli


__version__ = "0.9.11"
__version__ = "0.9.12"
APP_NAME = "iscc-cli"
APP_DIR = click.get_app_dir(APP_NAME, roaming=False)
os.makedirs(iscc_cli.APP_DIR, exist_ok=True)
Expand Down
7 changes: 4 additions & 3 deletions iscc_cli/commands/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from os.path import basename, abspath
import click
import mobi
from iscc_cli.tika import detector, parser
from iscc_cli.tika import parser
import iscc
from iscc_cli import video_id
from iscc_cli.const import SUPPORTED_MIME_TYPES, GMT
from iscc_cli.utils import get_files, mime_to_gmt, get_title, DefaultHelp, clean_mime
from iscc_cli.utils import get_files, mime_to_gmt, get_title, DefaultHelp
from iscc_cli import audio_id, fpcalc
from loguru import logger as log
from iscc_cli.mediatype import mime_guess, mime_clean


@click.command(cls=DefaultHelp)
Expand Down Expand Up @@ -52,7 +53,7 @@ def batch(path, recursive, guess, debug):
log.warning(msg)
continue

media_type = clean_mime(detector.from_file(f))
media_type = mime_clean(mime_guess(f))
if media_type not in SUPPORTED_MIME_TYPES:
fname = basename(f)
msg = "Unsupported file {} with mime type: {},,,,".format(fname, media_type)
Expand Down
7 changes: 4 additions & 3 deletions iscc_cli/commands/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import click
import mobi
from click import UsageError
from iscc_cli.tika import parser, detector
from iscc_cli.utils import DefaultHelp, clean_mime
from iscc_cli.tika import parser
from iscc_cli.utils import DefaultHelp
from iscc_cli.const import SUPPORTED_MIME_TYPES
import json
from iscc_cli.mediatype import mime_guess, mime_clean


@click.command(cls=DefaultHelp)
Expand All @@ -20,7 +21,7 @@
def dump(path, strip, meta, content):
"""Dump Tika extraction results for PATH (file or url path)."""

media_type = clean_mime(detector.from_file(path))
media_type = mime_clean(mime_guess(path))

if media_type not in SUPPORTED_MIME_TYPES:
click.echo("Unsupported media type {}.".format(media_type))
Expand Down
8 changes: 4 additions & 4 deletions iscc_cli/commands/gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import click
import iscc
import mobi
from iscc_cli.tika import detector, parser

from iscc_cli.tika import parser
from iscc_cli import audio_id, video_id, fpcalc
from iscc_cli.const import SUPPORTED_MIME_TYPES, GMT
from iscc_cli.utils import get_title, mime_to_gmt, DefaultHelp, clean_mime
from iscc_cli.utils import get_title, mime_to_gmt, DefaultHelp
from iscc_cli.mediatype import mime_guess, mime_clean


@click.command(cls=DefaultHelp)
Expand All @@ -33,7 +33,7 @@ def gen(file, guess, title, extra, verbose):
if not filesize:
raise click.BadParameter("Cannot proccess empty file: {}".format(file.name))

media_type = clean_mime(detector.from_file(file.name))
media_type = mime_clean(mime_guess(file.name))
if media_type not in SUPPORTED_MIME_TYPES:
click.echo("Unsupported media type {}.".format(media_type))
click.echo("Please request support at https://github.com/iscc/iscc-cli/issues")
Expand Down
6 changes: 3 additions & 3 deletions iscc_cli/commands/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import iscc
import mobi
import requests
from iscc_cli.tika import parser, detector
from iscc_cli.tika import parser
import iscc_cli
from iscc_cli import fpcalc, audio_id, video_id
from iscc_cli.const import SUPPORTED_MIME_TYPES, GMT
Expand All @@ -15,8 +15,8 @@
mime_to_gmt,
DefaultHelp,
download_file,
clean_mime,
)
from iscc_cli.mediatype import mime_guess, mime_clean

HEADERS = {"User-Agent": "ISCC {}".format(iscc_cli.__version__)}

Expand Down Expand Up @@ -46,7 +46,7 @@ def web(url, guess, title, extra, verbose):
raise click.BadArgumentUsage(e)

data = BytesIO(resp.content)
media_type = clean_mime(detector.from_buffer(data))
media_type = mime_clean(mime_guess(data))
if media_type not in SUPPORTED_MIME_TYPES:
click.echo("Unsupported media type {}".format(media_type))
click.echo("Please request support at https://github.com/iscc/iscc-cli/issues")
Expand Down
21 changes: 21 additions & 0 deletions iscc_cli/datatypes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
import mmap
from enum import Enum
from io import BytesIO, BufferedReader
from pathlib import Path
from typing import Union, BinaryIO

Data = Union[bytes, bytearray, memoryview]
Uri = Union[str, Path]
File = Union[BinaryIO, mmap.mmap, BytesIO, BufferedReader]
Readable = Union[Uri, Data, File]


class GMT(str, Enum):
"""Generic Metdia Type"""

text = "text"
image = "image"
audio = "audio"
video = "video"
unknown = "unknown"
188 changes: 188 additions & 0 deletions iscc_cli/mediatype.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
from loguru import logger
from typing import List, Optional, Union
import mimetypes
import magic
from PIL import Image
from iscc_cli import uread


__all__ = [
"mime_guess",
"mime_normalize",
"mime_supported",
"mime_clean",
"mime_to_gmt",
"mime_from_name",
"mime_from_data",
]


def mime_guess(data, file_name=None):
# type: (Readable, str) -> str
"""Heuristic guessing of mediatype for different kinds of inputs.
We try matching by file extension. If that fails we match by content sniffing.
"""

guess_name, guess_data = None, None
file = uread.open_data(data)

if file_name is None:
if hasattr(file, "name"):
file_name = file.name
elif hasattr(file, "filename"):
file_name = file.filename

if file_name:
guess_name = mime_from_name(file_name)

guess_data = mime_from_data(file.read(4096))

# Normalize
guess_data = mime_normalize(guess_data)
guess_name = mime_normalize(guess_name)

return guess_name or guess_data


def mime_normalize(mime: str) -> str:
"""Return normalized version of a mediatype."""
return MEDIATYPE_NORM.get(mime, mime)


def mime_supported(mime: str) -> bool:
"""Check if mediatype is supported"""
return mime_normalize(mime) in SUPPORTED_MEDIATYPES


def mime_from_name(name: str) -> Optional[str]:
"""Guess mediatype from filename or url."""
return mimetypes.guess_type(name)[0]


def mime_from_data(data: bytes) -> Optional[str]:
"""Guess mediatype by sniffing raw header data."""
return magic.from_buffer(data, mime=True)


def mime_clean(mime: Union[str, List]):
"""
Clean mimetype/content-type string or first entry of a list of mimetype strings.
Also removes semicolon separated encoding information.
"""
if mime and isinstance(mime, List):
mime = mime[0]
if mime:
mime = mime.split(";")[0]
return mime.strip()


def mime_to_gmt(mime_type: str, file_path=None):
"""Get generic mediatype from mimetype."""
mime_type = mime_clean(mime_type)
if mime_type == "image/gif" and file_path:
img = Image.open(file_path)
if img.is_animated:
return "video"
else:
return "image"
entry = SUPPORTED_MEDIATYPES.get(mime_type)
if entry:
return entry["gmt"]
gmt = mime_type.split("/")[0]
if gmt in list(GMT):
logger.warning(f"Guessing GMT from {mime_type}")
return gmt


mimetypes.add_type("text/markdown", ".md")
mimetypes.add_type("text/markdown", ".markdown")
mimetypes.add_type("application/x-mobipocket-ebook", ".mobi")
mimetypes.add_type("application/x-sqlite3", ".sqlite")
mimetypes.add_type("video/mp4", ".f4v")


SUPPORTED_MEDIATYPES = {
# Text Formats
"application/rtf": {"gmt": "text", "ext": "rtf"},
"application/msword": {"gmt": "text", "ext": "doc"},
"application/pdf": {"gmt": "text", "ext": "pdf"},
"application/epub+zip": {"gmt": "text", "ext": "epub"},
"text/xml": {"gmt": "text", "ext": "xml"},
"application/json": {"gmt": "text", "ext": "json"},
"application/xhtml+xml": {"gmt": "text", "ext": "xhtml"},
"application/vnd.oasis.opendocument.text": {"gmt": "text", "ext": "odt"},
"text/html": {"gmt": "text", "ext": "html"},
"text/plain": {"gmt": "text", "ext": "txt"},
"application/x-ibooks+zip": {"gmt": "text", "ext": "ibooks"},
"text/markdown": {"gmt": "text", "ext": ["md", "markdown"]},
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": {
"gmt": "text",
"ext": "docx",
},
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": {
"gmt": "text",
"ext": "xlsx",
},
# Note: pptx only detected by file extension. Sniffing gives 'application/zip'
"application/vnd.openxmlformats-officedocument.presentationml.presentation": {
"gmt": "text",
"ext": "pptx",
},
"application/vnd.ms-excel": {"gmt": "text", "ext": "xls"},
"application/x-mobipocket-ebook": {
"gmt": "text",
"ext": ["mobi", "prc", "azw", "azw3", "azw4"],
},
# Image Formats
"image/bmp": {"gmt": "image", "ext": "bmp"},
"image/gif": {"gmt": "image", "ext": "gif"},
"image/jpeg": {"gmt": "image", "ext": ["jpg", "jpeg"]},
"image/png": {"gmt": "image", "ext": "png"},
"image/tiff": {"gmt": "image", "ext": "tif"},
"image/vnd.adobe.photoshop": {"gmt": "image", "ext": "psd"},
"application/postscript": {"gmt": "image", "ext": "eps"},
# Audio Formats
"audio/mpeg": {"gmt": "audio", "ext": "mp3"},
"audio/wav": {"gmt": "audio", "ext": "wav"},
"audio/x-wav": {"gmt": "audio", "ext": "wav"},
"audio/ogg": {"gmt": "audio", "ext": "ogg"},
"audio/aiff": {"gmt": "audio", "ext": "aif"},
"audio/x-aiff": {"gmt": "audio", "ext": "aif"},
"audio/x-flac": {"gmt": "audio", "ext": "flac"},
"audio/opus": {"gmt": "audio", "ext": "opus"},
# Video Formats
"application/vnd.rn-realmedia": {"gmt": "video", "ext": "rm"},
"video/x-dirac": {"gmt": "video", "ext": "drc"},
"video/3gpp": {"gmt": "video", "ext": "3gp"},
"video/3gpp2": {"gmt": "video", "ext": "3g2"},
"video/x-ms-asf": {"gmt": "video", "ext": "asf"},
"video/avi": {"gmt": "video", "ext": "avi"},
"video/webm": {"gmt": "video", "ext": "webm"},
"video/mpeg": {"gmt": "video", "ext": ["mpeg", "mpg", "m1v", "vob"]},
"video/mp4": {"gmt": "video", "ext": "mp4"},
"video/x-m4v": {"gmt": "video", "ext": "m4v"},
"video/x-matroska": {"gmt": "video", "ext": "mkv"},
"video/ogg": {"gmt": "video", "ext": ["ogg", "ogv"]},
"video/quicktime": {"gmt": "video", "ext": ["mov", "f4v"]},
"video/x-flv": {"gmt": "video", "ext": "flv"},
"application/x-shockwave-flash": {"gmt": "video", "ext": "swf"},
"video/h264": {"gmt": "video", "ext": "h264"},
"video/x-ms-wmv": {"gmt": "video", "ext": "wmv"},
}

MEDIATYPE_NORM = {
"audio/x-aiff": "audio/aiff",
"audio/x-wav": "audio/wav",
"image/x-ms-bmp": "image/bmp",
"video/x-msvideo": "video/avi",
}

SUPPORTED_EXTENSIONS = []
for v in SUPPORTED_MEDIATYPES.values():
ext = v["ext"]
if isinstance(ext, str):
SUPPORTED_EXTENSIONS.append(ext)
else:
for e in ext:
SUPPORTED_EXTENSIONS.append(e)
24 changes: 24 additions & 0 deletions iscc_cli/uread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
import io
from typing import Union
from iscc_cli.datatypes import Readable, Uri, File, Data
from typing import BinaryIO
from iscc_cli.utils import download_file


def open_data(data):
# type: (Readable) -> Union[BinaryIO]
"""Open filepath, rawdata or file-like object."""
if isinstance(data, Uri.__args__):
if isinstance(data, str) and (
data.startswith("http://") or data.startswith("https://")
):
data = download_file(data, sanitize=True)
return open(str(data), "rb")
elif isinstance(data, Data.__args__):
return io.BytesIO(data)
elif isinstance(data, File.__args__):
data.seek(0)
return data
else:
raise ValueError(f"unsupported data type {type(data)}")
8 changes: 4 additions & 4 deletions iscc_cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import iscc
import requests
from PIL import Image

import iscc_cli
from iscc_cli.const import (
SUPPORTED_EXTENSIONS,
Expand Down Expand Up @@ -88,11 +87,12 @@ def mime_to_gmt(mime_type, file_path=None):

def get_title(tika_result: dict, guess=False, uri=None):
title = ""
gmt = None
meta = tika_result.get("metadata")
mime_type = clean_mime(meta.get("Content-Type"))
gmt = mime_to_gmt(mime_type)

if meta:
mime_type = clean_mime(meta.get("Content-Type"))
gmt = mime_to_gmt(mime_type)
title = meta.get("dc:title", "")
title = title[0].strip() if isinstance(title, list) else title.strip()
if not title:
Expand Down Expand Up @@ -166,7 +166,7 @@ def iscc_split(i):
def download_file(url, md5=None, sanitize=False):
"""Download file to app dir and return path."""
url_obj = urlparse(url)
file_name = os.path.basename(url_obj.path)
file_name = os.path.basename(url_obj.path) or "temp.file"
if sanitize:
file_name = safe_filename(file_name)
out_path = os.path.join(iscc_cli.APP_DIR, file_name)
Expand Down
Loading

0 comments on commit 268a07c

Please # to comment.