Skip to content

Commit

Permalink
initial blob support
Browse files Browse the repository at this point in the history
  • Loading branch information
dobe committed Sep 17, 2013
1 parent 2060084 commit c02c8fe
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 57 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ build
*.pyc
.idea/
.tox/
*.iml
*.DS_Store
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Changes for crate
Unreleased
==========

- basic Blob-Client-API implemented

2013/09/09 0.0.3
================

Expand Down
39 changes: 33 additions & 6 deletions src/crate/client/blob.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib


class BlobContainer(object):
Expand All @@ -11,12 +12,38 @@ def __init__(self, container_name, connection):
self.container_name = container_name
self.conn = connection

@property
def _blob_path(self):
return self.container_name + '/_blobs/'

def stats(self):
return self.conn.client._request('GET', self._blob_path)
def _compute_digest(self, f):
f.seek(0)
m = hashlib.sha1()
while True:
d = f.read()
if d:
m.update(d)
else:
f.seek(0)
return m.hexdigest()

def put(self, f, digest=None):
if digest:
actual_digest = digest
else:
actual_digest = self._compute_digest(f)

created = self.conn.client.blob_put(self.container_name, actual_digest, f)
if digest:
return created
else:
return actual_digest

def get(self, digest):
return self.conn.client.blob_get(self.container_name, digest)

def delete(self, digest):
return self.conn.client.blob_del(self.container_name, digest)

def exists(self, digest):
return self.conn.client.blob_exists(self.container_name, digest)

def __repr__(self):
return "<BlobContainer '{}'>".format(self.container_name)

96 changes: 66 additions & 30 deletions src/crate/client/blobs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Crate BLOB API
==============

The Crate client library provides an API to access the powerful BLOB storage
The Crate client library provides an API to access the powerful Blob storage
capabilities of the Crate server.

First, a connection object is required. This can be retrieved by importing the
Expand All @@ -11,44 +11,80 @@ client module and then connecting to one or more crate server::
>>> from crate import client
>>> connection = client.connect(crate_host)

The connection object can then be used to retrieve a BLOB container. A BLOB
container is a special kind of table that can contain files. On these
containers it is possible to define the number of shards and replicas.

To access a container the connection object provides a `get_blob_container`
method::
Every table which has Blob support enabled, may act as a container for
Blobs. The ``BlobContainer`` object for a specific table can be
retrieved like this::

>>> blob_container = connection.get_blob_container('myfiles')
>>> blob_container
<BlobContainer 'myfiles'>

This container object can now be used to either get statistics about the
stored files, to download, upload or delete files::
The returned container object can now be used to manage the contained
Blobs.

Uploading Blobs
===============

To upload a Blob the ``put`` method can be used. This method takes a
file like object and an optional SHA-1 digest as argument.

In this example we upload a file without specifying the SHA-1 digest::

>>> from tempfile import TemporaryFile
>>> f = TemporaryFile()
>>> f.write("this is the content of the file")
>>> f.flush()

The actual ``put`` - it returns the computed SHA-1 digest upon completion.

>>> print blob_container.put(f)
6d46af79aa5113bd7e6a67fae9ab5228648d3f81

.. note::

Omitting the SHA-1 digest results in one extra read of the file
contents to compute the digest before the actual upload
starts. Therefore, if the application already has a SHA-1 digest for
the content, or is able to compute the digest on another read
upfront, providing the digest will lead to better performance.

Here is another example, wich provides the digest in the call::

>>> blob_container.stats()
>>> f.seek(0)
>>> blob_container.put(f, digest='6d46af79aa5113bd7e6a67fae9ab5228648d3f81')
False

To upload a file the `.put()` method can be used. The method takes either a
file like object, a generator or the file content as string.
.. note::

.. doctest::
:hide:
The above call returned ``False`` because the object already
existed. Since the digest is already known by the caller and it makes no
sense to return it again, a boolean gets returned which indicates if
the Blob was newly created or not.

Retrieving Blobs
================

Retrieving a blob can be done by using the ``get`` method like this::

>>> res = blob_container.get('6d46af79aa5113bd7e6a67fae9ab5228648d3f81')

The result is a generator object which returns one chunk per iteration.

>>> print res.next()
this is the content of the file

It is also possible to check if a blob exists like this::

>>> blob_container.exists('6d46af79aa5113bd7e6a67fae9ab5228648d3f81')
True

Deleting Blobs
==============

>>> from tempfile import mkstemp
>>> from os import fdopen
>>> tmp_file = mkstemp()
>>> tmp_file_name = tmp_file[1]
>>> tmp_file_obj = fdopen(tmp_file[0])
>>> tmp_file_obj.write('aaaa')
>>> tmp_file_obj.close()
To delet a blob just call the ``delete`` method, the resulting boolean
states weather a blob existed under the given digest or not::

Using file like objects or a generator is recommended since then it is possible
to stream the files without reading them into memory::
>>> blob_container.delete('6d46af79aa5113bd7e6a67fae9ab5228648d3f81')
True

>>> f = open(tmp_file_name)
>>> blob_container.put(f)
ASDF

The client will then compute then compute the `SHA-1` hash of the file which is
required by the Crate server to store the file. And then upload the file using
that hash. If the upload was succesful the hash is then returned. If it failed
`None` is returned instead.
1 change: 0 additions & 1 deletion src/crate/client/connection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from .cursor import Cursor
from .exceptions import ProgrammingError
from .http import Client
Expand Down
65 changes: 60 additions & 5 deletions src/crate/client/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,64 @@
from __future__ import absolute_import
from exceptions import StandardError

class ProgrammingError(Exception):
"""Base class for exceptions in this module."""

class Error(StandardError):
pass


class Warning(StandardError):
pass


class InterfaceError(Error):
pass


class DatabaseError(Error):
pass


class InternalError(DatabaseError):
pass


class OperationalError(DatabaseError):
pass


class ProgrammingError(DatabaseError):
pass


class IntegrityError(DatabaseError):
pass


class DataError(DatabaseError):
pass


class NotSupportedError(DatabaseError):
pass


# exceptions not in db api

class ConnectionError(OperationalError):
pass



class BlobException(Exception):

def __init__(self, table, digest):
self.table = table
self.digest = digest

def __str__(self):
return "{table}/{digest}".format(table=self.table, digest=self.digest)

class DigestNotFoundException(BlobException):
pass


class ConnectionError(Exception):
"""Crate HTTP API Exception"""
pass
86 changes: 71 additions & 15 deletions src/crate/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime
from operator import itemgetter

from crate.client.exceptions import ConnectionError
from crate.client.exceptions import ConnectionError, DigestNotFoundException

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -46,26 +46,72 @@ def sql(self, stmt):
if not isinstance(stmt, basestring):
raise ValueError("stmt is not a string type")

content = self._request('POST', self.sql_path, dict(stmt=stmt))

content = self._json_request('POST', self.sql_path, data=dict(stmt=stmt))
logger.debug("JSON response for stmt(%s): %s", stmt, content)

return content

def _request(self, method, path, data=None):
def _blob_path(self, table, digest=None):
path = table + '/_blobs/'
if digest:
path += digest
return path

def blob_put(self, table, digest, data):
"""
Issue request against the crate HTTP API.
Stores the contents of the file like @data object in a blob under the given table and
digest.
"""
response = self._request('PUT', self._blob_path(table, digest), data=data)
if response.status_code == 201:
return True
elif response.status_code == 409:
return False
response.raise_for_status()

def blob_del(self, table, digest):
"""
Deletes the blob with given digest under the given table.
"""
response = self._request('DELETE', self._blob_path(table, digest))
if response.status_code == 204:
return True
elif response.status_code == 404:
return False
response.raise_for_status()


def blob_get(self, table, digest, chunk_size=1024*128):
"""
Returns a file like object representing the contents of the blob with the given
digest.
"""
response = self._request('GET', self._blob_path(table, digest), stream=True)

if response.status_code == 404:
raise DigestNotFoundException(table, digest)
response.raise_for_status()
return response.iter_content(chunk_size=chunk_size)

def blob_exists(self, table, digest):
"""
Returns true if the blob with the given digest exists under the given table.
"""
response = self._request('HEAD', self._blob_path(table, digest))
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
response.raise_for_status()


def _request(self, method, path, **kwargs):
while True:
server = self._get_server()
try:
# build uri and send http request
uri = "http://{server}/{path}".format(server=server, path=path)
if data:
data = json.dumps(data)
response = requests.request(method, uri, data=data,
timeout=self._http_timeout)
return requests.request(method, uri, timeout=self._http_timeout, **kwargs)
except (requests.ConnectionError, requests.Timeout,
requests.TooManyRedirects) as ex:
# drop server from active ones
Expand All @@ -75,14 +121,24 @@ def _request(self, method, path, data=None):
if not self._active_servers:
raise ConnectionError(
("No more Servers available, "
"exception from last server: %s") % ex_message)
else:
# raise error if occurred, otherwise nothing is raised
response.raise_for_status()
"exception from last server: %s") % ex_message)


def _json_request(self, method, path, data=None):
"""
Issue request against the crate HTTP API.
"""

# return parsed json response
return response.json(cls=DateTimeDecoder)
if data:
data = json.dumps(data)
response = self._request(method, path, data=data)

# raise error if occurred, otherwise nothing is raised
response.raise_for_status()
# return parsed json response
if len(response.content)>0:
return response.json(cls=DateTimeDecoder)
return response.content

def _get_server(self):
"""
Expand Down
Loading

0 comments on commit c02c8fe

Please # to comment.