s3-lambda
enables you to run lambda functions over a context of S3 objects. It has a stateless architecture with concurrency control, allowing you to process a large number of files very quickly. This is useful for quickly prototyping complex data jobs without an infrastructure like Hadoop or Spark.
At Littlstar, we use s3-lambda
for all sorts of data pipelining and analytics.
npm install s3-lambda --save
const S3Lambda = require('s3-lambda')
// example options
const lambda = new S3Lambda({
accessKeyId: 'aws-access-key', // Optional. (falls back on local AWS credentials)
secretAccessKey: 'aws-secret-key', // Optional. (falls back on local AWS credentials)
showProgress: true, // Optional. Show progress bar in stdout
verbose: true, // Optional. Show all S3 operations in stdout (GET, PUT, DELETE)
maxRetries: 10, // Optional. Maximum request retries on an S3 object. Defaults to 10.
timeout: 10000 // Optional. Amount of time for request to timeout. Defaults to 10000 (10s)
})
const context = {
bucket: 'my-bucket',
prefix: 'path/to/files/'
}
lambda
.context(context)
.forEach(object => {
// do something with object
})
.then(_ => console.log('done!'))
.catch(console.error)
Before initiating a lambda expression, you must tell s3-lambda
what files to operate over by calling context
. A context is defined with an options object with the following properties: bucket, prefix, marker, limit, and reverse.
lambda.context({
bucket: 'my-bucket', // The S3 bucket to use
prefix: 'prefix/', // The prefix of the files to use - s3-lambda will operate over every file with this prefix.
endPrefix: 'prefix/file3', // Optional. End at this prefix. Defaults to null
marker: 'prefix/file1', // Optional. Start at this prefix. If it is a full file path, starts with next file. Defaults to null.
limit: 1000, // Optional. Limit the # of files operated over. Default is Infinity.
reverse: false // Optional. If true, operate over all files in reverse. Defaults to false.
})
You can also provide an array of context options, which will tell ls-lambda
to operate over all the files in each.
const ctx1 = {
bucket: 'my-bucket',
prefix: 'path/to/files/',
marker: 'path/to/logs/2017'
}
const ctx2 = {
bucket: 'my-other-bucket',
prefix: 'path/to/other/logs/',
limit: 100
}
lambda.context([ctx1, ctx2])
After setting context, you can chain several other functions that modify the operation. Each returns a Request
object, so they can be chained. All of these are optional.
{Number} Set the request concurrency level (default is Infinity
).
{Function} Sets the transformation function to use when getting objects. The function takes the object as an argument, and should return the transformed object.
Example: unzipping compressed S3 files before each operation
const zlib = require('zlib')
lambda
.context(context)
.transform((object) => {
return zlib.gunzipSync(object).toString()
})
.each(...)
{String} Sets the encoding to use when getting objects.
{Number} Limit the number of files operated over.
{Boolean} Reverse the order of files operated over.
Lets the resolver know that your function is async (returns a Promise).
Perform synchronous or asynchronous functions over each file in the set context.
- each
- forEach
- map
- reduce
- filter
each(fn[, isasync])
Performs fn
on each S3 object in parallel. You can set the concurrency level (defaults to Infinity
).
If isasync
is true, fn
should return a Promise.
lambda
.context(bucket, prefix)
.concurrency(5) // operates on 5 objects at a time
.each(object => console.log(object))
.then(_ => console.log('done!'))
.catch(console.error)
forEach(fn[, isasync])
Same as each
, but operates sequentially, one file at a time. Setting concurrency for this function is superfluous.
lambda
.context(bucket, prefix)
.forEach(object => { /* do something with object */ })
.then(_ => console.log('done!'))
.catch(console.error)
map(fn[, isasync])
Destructive. Maps fn
over each file in an S3 directory, replacing each file with what is returned
from the mapper function. If isasync
is true, fn
should return a Promise.
const addSmiley = object => object + ':)'
lambda
.context(bucket, prefix)
.map(addSmiley)
.then(console.log('done!'))
.catch(console.error)
You can make this non-destructive by specifying an output
directory.
const outputBucket = 'my-bucket'
const outputPrefix = 'path/to/output/'
lambda
.context(bucket, prefix)
.output(outputBucket, outputPrefix)
.map(addSmiley)
.then(console.log('done!'))
.catch(console.error)
reduce(func[, isasync])
Reduces the objects in the working context to a single value.
// concatonates all the files
const reducer = (previousValue, currentValue, key) => {
return previousValue + currentValue
}
lambda
.context(bucket, prefix)
.reduce(reducer)
.then(result => { /* do something with result */ })
.catch(console.error)
filter(func[, isasync])
Destructive. Filters (deletes) files in S3. func
should return true
to keep the object, and false
to delete it. If isasync
is true, func
returns a Promise.
// filters empty files
const fn = object => object.length > 0
lambda
.context(bucket, prefix)
.filter(fn)
.then(_ => console.log('done!'))
.catch(console.error)
Just like in map
, you can make this non-destructive by specifying an output
directory.
lambda
.context(bucket, prefix)
.output(outputBucket, outputPrefix)
.filter(filter)
.then(console.log('done!'))
.catch(console.error()
Promise-based wrapper around common S3 methods.
- list
- keys
- get
- put
- copy
- delete
list(bucket, prefix[, marker])
List all keys in s3://bucket/prefix
. If you use a marker, s3-lambda
will start listing alphabetically from there.
lambda
.list(bucket, prefix)
.then(list => console.log(list))
.catch(console.error)
keys(bucket, prefix[, marker])
Returns an array of keys for the given bucket
and prefix
.
lambda
.keys(bucket, prefix)
.then(keys => console.log(keys))
.catch(console.error)
get(bucket, key[, encoding[, transformer]])
Gets an object in S3, calling toString(encoding
on objects.
lambda
.get(bucket, key)
.then(object => { /* do something with object */ })
.catch(console.error)
Optionally you can supply your own transformer function to use when retrieving objects.
const zlib = require('zlib')
const transformer = object => {
return zlib.gunzipSync(object).toString('utf8')
}
lambda
.get(bucket, key, null, transformer)
.then(object => { /* do something with object */ })
.catch(console.error)
put(bucket, key, object[, encoding])
Puts an object in S3. Default encoding is utf8
.
lambda
.put(bucket, key, 'hello world!')
.then(console.log('done!')).catch(console.error)
copy(bucket, key, targetBucket, targetKey)
Copies an object in S3 from s3://sourceBucket/sourceKey
to s3://targetBucket/targetKey
.
lambda
.copy(sourceBucket, sourceKey, targetBucket, targetKey)
.then(console.log('done!')).catch(console.error)
delete(bucket, key)
Deletes an object in S3 (s3://bucket/key
).
lambda
.delete(bucket, key)
.then(console.log('done!')).catch(console.error)