Edit Page

.stream()

Stream records from your database one at a time or in batches, without first having to buffer the entire result set in memory.

await Something.stream(criteria)
.eachRecord(async (record)=>{

});

Usage

Argument Type Details
1 criteria The Waterline criteria to use for matching records in the database.
Iteratee

Use one of the following:

  • .eachRecord(async (record)=>{ ... })
  • .eachBatch(async (batch)=>{ ... })


Argument Type Details
1 record or batch or The current record, or the current batch of records. A batch will always contain at least one (and no more than thirty) records.

Note that prior to Sails 1.1.0, the recommended usage of .stream() expected the iteratee to invoke a callback (next), which is provided as the second argument. This is no longer necessary as long as you do not actually include a second argument in the function signature.

Errors
Name Type When?
UsageError Thrown if something invalid was passed in.
AdapterError Thrown if something went wrong in the database adapter.
Error Thrown if anything else unexpected happens.

See Concepts > Models and ORM > Errors for examples of negotiating errors in Sails and Waterline.

When should I use this?

The .stream() method is almost exactly like .find(), except that it fetches records one batch at a time. Every time a batch of records is loaded, the iteratee function you provided is called one or more times. If you used .eachRecord(), your per-record function will be called once for each record in the batch. Otherwise, using .eachBatch(), your per-batch function will be called once with the entire batch.

This is useful for working with very large result sets; the kinds of result sets that might overflow your server's available RAM... at least, they would if you tried to hold the entire thing in memory at the same time. You can use Waterline's .stream() method to do the kinds of things you might already be familiar with from Mongo cursors: preparing reports, looping over and modifying database records in a shell script, moving large amounts of data from one place to another, performing complex transformations, or even orchestrating map/reduce jobs.

Examples

There are 4 examples below.

Basic usage

An action that iterates over users named Finn in the database, one at a time:

await User.stream({name:'Finn'})
.eachRecord(async (user)=>{

  if (Math.random() > 0.5) {
    throw new Error('Oops!  This is a simulated error.');
  }

  sails.log(`Found a user ${user.id} named Finn.`);
});
Generating a dynamic sitemap

An action that responds with a dynamically-generated sitemap:

// e.g. in an action that handles `GET /sitemap.xml`:

var sitemapXml = '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">';

await BlogPost.stream()
.limit(50000)
.sort('title ASC')
.eachRecord((blogPost)=>{
  sitemapXml += (
    '<url>\n'+
    '  <loc>https://blog.example.com/' + _.escape(encodeURIComponent(blogPost.slug))+'</loc>\n'+
    '  <lastmod>'+_.escape(blogPost.updatedAt)+'</lastmod>\n'+
    '<changefreq>monthly</changefreq>\n'+
    '</url>'
  );
});

sitemapXml += '</urlset>';
With .populate()

A snippet of a command-line script that searches for creepy comments from someone named "Bailey Bitterbumps" and reports them to the authorities:

// e.g. in a shell script

var numReported = 0;

await Comment.stream({ author: 'Bailey Bitterbumps' })
.limit(1000)
.skip(40)
.sort('title ASC')
.populate('attachedFiles', {
  limit: 3,
  sort: 'updatedAt'
})
.populate('fromBlogPost')
.eachRecord(async (comment)=>{

  var isCreepyEnoughToWorryAbout = comment.rawMessage.match(/creepy/) && comment.attachedFiles.length > 1;
  if (!isCreepyEnoughToWorryAbout) {
    return;
  }

  await sails.helpers.sendTemplateEmail.with({
    template: 'email-creepy-comment-notification',
    templateData: {
      url: `https://blog.example.com/${comment.fromBlogPost.slug}/comments/${comment.slug}.`
    },
    to: '[email protected]',
    subject: 'Creepy comment alert'
  });

  numReported++;
});

sails.log(`Successfully reported ${numReported} creepy comments.`);
Batch-at-a-time

If we ran the code in the previous example, we'd be sending one email per creepy comment... which could be a lot! Not only would this be slow, it could mean sending thousands of individual API requests to our transactional email provider, quickly overwhelming our API rate limit.

For this case, we could use .eachBatch() to grab the entire batch of records being fetched, rather than processing individual records one at a time; dramatically reducing the number of necessary API requests.

Notes

  • This method can be used with await, promise chaining, or traditional Node callbacks
  • Internally, regardless whether you're using .eachBatch() or .eachRecord(), Waterline grabs pages of 30 records at a time.
  • Just like async.eachSeries(), this method bails and throws an error (or calls its .exec() callback with an error) immediately upon receiving the first error from any iteratee.
  • .stream() runs the provided iteratee function on each record or batch, one at a time, in series.
  • Prior to Sails v1.0 / Waterline 0.13, this method had a lower-level interface, exposing a Readable "object stream". This was powerful, but tended to be error-prone. So the new, adapter-agnostic .stream() does not rely on emitters, or any particular flavor of Node streams. (Need to get it working the old way? Don't worry, with a little code, you can still easily build a streams2/streams3-compatible Readable "object stream" using the new interface.)
  • Read more about .stream() here, including additional examples, background information, and implementation details.

Is something missing?

If you notice something we've missed or could be improved on, please follow this link and submit a pull request to the sails-docs repo. Once we merge it, the changes will be reflected on the website the next time it is deployed.

Sails logo
  • Home
  • Get started
  • Support
  • Documentation
  • Documentation

For a better experience on sailsjs.com, update your browser.

Documentation

Reference Concepts App structure | Upgrading Contribution guide | Tutorials More

Reference

  • Application
    • Advanced usage
      • Lifecycle
      • sails.LOOKS_LIKE_ASSET_RX
      • sails.getActions()
      • sails.getRouteFor()
      • sails.lift()
      • sails.load()
      • sails.lower()
      • sails.registerAction()
      • sails.registerActionMiddleware()
      • sails.reloadActions()
      • sails.renderView()
      • sails.request()
      • sails.getBaseUrl()
    • sails.config.custom
    • sails.getDatastore()
    • sails.getUrlFor()
    • sails.log()
  • Blueprint API
    • add to
    • create
    • destroy
    • find one
    • find where
    • populate where
    • remove from
    • replace
    • update
  • Command-line interface
    • sails console
    • sails debug
    • sails generate
    • sails inspect
    • sails lift
    • sails new
    • sails version
  • Configuration
    • sails.config.*
    • sails.config.blueprints
    • sails.config.bootstrap()
    • sails.config.custom
    • sails.config.datastores
    • sails.config.globals
    • sails.config.http
    • sails.config.i18n
    • sails.config.log
    • sails.config.models
    • sails.config.policies
    • sails.config.routes
    • sails.config.security
    • sails.config.session
    • sails.config.sockets
    • sails.config.views
  • Request (`req`)
    • req._startTime
    • req.body
    • req.cookies
    • req.fresh
    • req.headers
    • req.hostname
    • req.ip
    • req.ips
    • req.isSocket
    • req.method
    • req.options
    • req.originalUrl
    • req.params
    • req.path
    • req.protocol
    • req.query
    • req.secure
    • req.signedCookies
    • req.socket
    • req.subdomains
    • req.url
    • req.wantsJSON
    • req.xhr
    • req.accepts()
    • req.acceptsCharsets()
    • req.acceptsLanguages()
    • req.allParams()
    • req.file()
    • req.get()
    • req.is()
    • req.param()
    • req.setLocale()
    • req.setTimeout()
    • req.host
  • Response (`res`)
    • res.attachment()
    • res.badRequest()
    • res.clearCookie()
    • res.cookie()
    • res.forbidden()
    • res.get()
    • res.json()
    • res.jsonp()
    • res.location()
    • res.notFound()
    • res.ok()
    • res.redirect()
    • res.send()
    • res.serverError()
    • res.set()
    • res.status()
    • res.type()
    • res.view()
    • res.negotiate()
  • Waterline (ORM)
    • Datastores
      • .driver
      • .manager
      • .leaseConnection()
      • .sendNativeQuery()
      • .transaction()
    • Models
      • .addToCollection()
      • .archive()
      • .archiveOne()
      • .avg()
      • .count()
      • .create()
      • .createEach()
      • .destroy()
      • .destroyOne()
      • .find()
      • .findOne()
      • .findOrCreate()
      • .getDatastore()
      • .removeFromCollection()
      • .replaceCollection()
      • .stream()
      • .sum()
      • .update()
      • .updateOne()
      • .validate()
      • .native()
      • .query()
    • Queries
      • .catch()
      • .decrypt()
      • .exec()
      • .fetch()
      • .intercept()
      • .limit()
      • .meta()
      • .populate()
      • .skip()
      • .sort()
      • .then()
      • .tolerate()
      • .toPromise()
      • .usingConnection()
      • .where()
    • Records
      • .toJSON()
  • WebSockets
    • Resourceful PubSub
      • .getRoomName()
      • .publish()
      • .subscribe()
      • .unsubscribe()
    • sails.sockets
      • .addRoomMembersToRooms()
      • .blast()
      • .broadcast()
      • .getId()
      • .join()
      • .leave()
      • .leaveAll()
      • .removeRoomMembersFromRooms()
      • sails.sockets.id()
    • Socket client
      • io.sails
      • io.socket
      • SailsSocket
        • Methods
        • Properties
      • io.socket.delete()
      • io.socket.get()
      • io.socket.off()
      • io.socket.on()
      • io.socket.patch()
      • io.socket.post()
      • io.socket.put()
      • io.socket.request()

Built with Love

The Sails framework is built by a web & mobile shop in Austin, TX, with the help of our contributors. We created Sails in 2012 to assist us on Node.js projects. Naturally we open-sourced it. We hope it makes your life a little bit easier!

Sails:
  • What is Sails?
  • Community
  • News
  • For business
About:
  • Our company
  • Security
  • Legal
  • Logos/artwork
Help:
  • Get started
  • Documentation
  • Docs
  • Contribute
  • Take a class

© 2012-2018 The Sails Company. 
The Sails framework is free and open-source under the MIT License.