Pipeline Tasks

Pipeline tasks are invoked in order. The output of an task is passed on to the next task unless keepResult is set to false. When a task outputs multiple results, the remaining tasks are invoked once for each result.

All do tasks supports these optional arguments:

  • publish - task output is published to the message queue with the specified topic.
  • await - true/false. Used with publish. If true the remaining tasks are set on hold until all subscribers are finished.
  • keepResult - true/false if the output from the task is passed on to the next task. Defaults to true.
  - { select: "assets[*]", publish: process_asset, await: true, keepResult: false }
  - { task: select, path: "assets[*]", publish: process_asset, await: true, keepResult: false }

Tasks and data format

Pipeline tasks can work either with json or binary data.

  -
    when:
      - {service: queue, topic: entry }
    do: 
      - {task: httpget, url: "#{fields.imageUrl}", format: binary }
      - {task: module, module: scaleImage, function:scaleForMobile }
    to:
      - {task: file, path: "c:\\temp", filename: "#{fields.imageId}" }

Here an image is fetched from an url specified in the incoming message. The image is then processed by calling a node module before it is saved to disc as binary format.

Built in pipeline tasks

delete

Delete a document from a storage.

  • service - (default arg)
  • key
  • index
  • dataType
  - { delete: elasticsearch, index: myIndex, dataType: myDataType, key: "#{id}"}
  - { task: delete, service: elasticsearch, index: myIndex, dataType: myDataType, key: "#{id}"}

enumkeys

Enumerate keys in a storage.

  • service - (default arg)
  • index
  • dataType
  - { enumkeys: file, index: myIndex, dataType: myDataType }
  - { task: enumkeys, service: file, index: myIndex, dataType: myDataType }

file

Store json or binary data to disk.

  • service - optional, defaults to a service named file
  • path
  • filename - (default arg)
  - { file: "#{filename}", path: "/var/images"  }
  - { task: file, path: "/var/images", filename: "#{filename}"  }

httpget

Send a HTTP GET request to specified url.

  • service - optional, defaults to a service named http
  • url - The request url (default arg)
  • format - Parse result as json, xml, binary, or auto.
  - { httpget: "http:#{file.url}", format: binary }
  - { task: httpget, url: "http:#{file.url}", format: binary }

http

Send a HTTP request to specified url.

  • service - optional, defaults to a service named http
  • url - The request url (default arg)
  • method - get, post, put or delete
  • body - Request body format. auto, none, json, formUrlEncoded, multipartFormUrlEncoded
  • format - Parse result as json, xml, binary, or auto.
  • user - username:password for Basic authentication
  • headers - an set of http headers to send with the request
  - { http: "https://requestb.in/1cygmkn1", method: post, format: binary, body: json }
  - { task: http, url: "https://requestb.in/1cygmkn1", method: post, format: binary, body: json }
  - http: "http://rss.cnn.com/rss/edition.rss"
    method: get
    format: xml
    headers:
      If-None-Match: "#{etagstore:etag}"

load

Load a document from a storage.

  • service - (default arg)
  • key
  • index
  • dataType
  - { load: elasticsearch, index: myIndex, dataType: myDataType, key: "#{id}"}
  - { task: load, service: elasticsearch, index: myIndex, dataType: myDataType, key: "#{id}"}

module

Invoke a Node.js module

  • module - (default arg)
  • function - (optional)
  - { module: myModule, function: myFunction }
  - { task: module, module: myModule, function: myFunction }

select/map

Select fields or path from json data.

Read here about Json Path expressions: http://goessner.net/articles/JsonPath/

  • path - json path. (default arg, optional)
  • xx - set a token named xx (optional)
  - { select: "$.jobs[*]" }
  - { task: select, path: "$.jobs[*]" }
  - { select, myId: "#{job.id}", whatever: foo }
  - { task: select, myId: "#{job.id}", whatever: foo }
  - select: 
    subject: Mail subject
    body: >
      This is a "body text" that can contain quotes and whatever

set

Set a value using a binding expression.

  • key - namespace:value binding expression (default arg)
  • value - The value
  - { set: "elasticsearch:mytoken", value: "#{nextToken}" }
  - { task: set, key: "elasticsearch:mytoken", value: "#{nextToken}" }
  - { set: "myobj.greeting", value: "Hello!" }
  - { task: set, key: "myobj.greeting", value: "Hello!" }

store

Store json data in a storage.

  • service - (default arg)
  • key
  • index
  • dataType
  - { store: elasticsearch, index: myIndex, dataType: myDataType, key: "#{id}" }
  - { task: store, service: elasticsearch, index: myIndex, dataType: myDataType, key: "#{id}" }

where

Conditional data filtering.

  • path - json path expression (default arg)
  • value
  • equal
  • not
  • isnull - true or false
  - { where: "headers.X-Topic", equal: "ContentManagement.unpublish"}
  - { task: where, path: "headers.X-Topic", equal: "ContentManagement.unpublish"}
  - { where: "file.url", null: false },
  - { task: where, path: "file.url", null: false },
  - { where, value: "#{elasticsearch:isEnabled}", not: false }
  - { task: where, value: "#{elasticsearch:isEnabled}", not: false }

enqueue/publish

Publish a message to a message queue.

  • topic - (default arg)
  • defer - (optional) delay message delivery. hh:mm:ss format.
  • service - (optional) defaults to queue
  - { publish: "process_asset"  }
  - { enqueue: "process_asset"  }
  - { task: enqueue, topic: "process_asset", service: myqueue  }
  - { publish: "process_asset", defer: "00:00:10" }

command

Run an external command.

  • command - The command to run (default arg)
  • args - Command line arguments
  • stdout - (optional) true/false Capture stdout and use as task output
  • stdin - (optional) true/false Write input data to command's stdin
  • lineEndings - (optional) LF, CR, CRLF or LFCR line endings
  • encoding - (optional) Encoding name used for stdout and stdin.
  - { command: "c:\\util\\curl\\bin\\curl.exe", args: "--url https://echo.getpostman.com/post -d @-", stdout: true, stdin: true, lineEndings: LF }
  - { task: command, command: "c:\\util\\curl\\bin\\curl.exe", args: "--url https://echo.getpostman.com/post -d @-", stdout: true, stdin: true, lineEndings: LF }

exit

Exit Foopipes process

  - { exit }
  - { task: exit }

log

Log string or document to current logger

  • value - Optional. the value to log (default arg)
  - { log }
  - { task: log }
  - { log: "That field is #{thatField}" }

readfile

Read file as json or binary.

  • path - (Optional) Path to file
  • filename - Filename (default arg)
  • service - (optional) Defaults to a service named file
  • format - (Optional) Read file as auto, binary or json.
  - { readfile: testdata\input.xml }
  - { task: readfile, filename: "testdata\input.xml" }

parsejson

Convert binary data to json.

  do:
    - parsejson

parsexml

Convert binary data from xml to json.

  do:
    - parsexml

script

Preview - May be subject to change

Execute a .NET C# script

  • script - (Optional) The script to execute. (default arg)
  • src - (Optional) Url to the script to execute.
  • file - (Optional) Filename with the script to execute.
  • imports - (Optional) , separated list with using statements.
  • replaceImports (Optional) true/false if the imports replaces or appends to the default imports.
  • references - (Optional) ; separated list with assembles.
  • replaceReferences (Optional) true/false if the references replaces or appends to the default references.
  do:
   - script: |     
       obj.hello = "world";
       return obj;