Examples

Github Export

This is basically a git export using Github's API. It recursivly download all files in a Git repo starting from a path and store the files in current directory, keeping the directory structure intact.

Create a file foopipes.yml like this:

version: 2
plugins: 
services: 
pipelines: 
  - 
    when: 
      - { service: queue, topic: started }
    from:
      - { task: httpget, url: "https://api.github.com/repos/AreteraAB/Foopipes.Examples/contents/Contentful" }
    to:
      - { task: publish, service: queue, topic: entry }
  -
    when:
      - { service: queue, topic: entry }
    from:
      - { task: where, value: "#{type}", equal: "file" }
    do: 
      - { task: set, key: "metadata:filename", value: "#{path}" }
      - { task: httpget, url: "#{download_url}", format: binary }
    to:
      - { task: file, path: ".", filename: "#{metadata:filename}" }
  -
    when:
      - { service: queue, topic: entry }
    from:
      - { task: where, value: "#{type}", equal: "dir" }
    do: 
      - { task: httpget, url: "#{url}"}
    to:
      - { task: publish, service: queue, topic: entry }

Start with:

docker run -it -v $(pwd):/project aretera/foopipes

Windows:

docker run -it -v %CD%:/project aretera/foopipes

How it works

At startup an initial request is done to the api and each entry in the repository is put into a queue with topic entry. Two pipelines are configured to process each entry in the queue, one for files and one for directories.

The Api returns json similar to this: (shortened for brevity)

[
  {
    "name": "Readme.md",
    "path": "Contentful/Readme.md",
    "sha": "1dbc524a326ab5a2bc1b4e84cc247256702be01f",
    "url": "https://api.github.com/repos/AreteraAB/Foopipes.Examples/contents/Contentful/Readme.md?ref=master",

    "download_url": "https://raw.githubusercontent.com/AreteraAB/Foopipes.Examples/master/Contentful/Readme.md",
    "type": "file" 
  },
  ....
]

Data binding expressions like #{download_url} are used for performing tasks on each entry.

If it the entry is a file it is downloaded and stored to disk. If it is a directory, the directory┬┤s entries are put on the queue.

We use a metadata placeholder for the path and filename which is inherited down in the pipeline to the remaining tasks and used in the to section when storing the downloaded file to disk.

Note: The Github api has rate restrictions for unauthenticated users (currenly 60 requests per hour). When this limit is reached you'll get a 403 Forbidden.

Super simple service monitoring

Create a free Mailgun account and create a foopipes.yml similar to this:

services:
  scheduler:
    type: scheduler
    interval: "00:01:00"
  http:
    type: http
    timeout: "00:00:05"

pipelines:
  -
    when:
      - { service: scheduler }
    do:
      - { task: httpget, url: "https://jsonplaceholder.typicode.com/posts" }
    then:
      - { task: publish, topic: serverdown, empty: true }
    error:
      - { task: publish, topic: serverdown }
  - 
    when:
      - { service: queue, topic: serverdown }
    do:
      - task: select
        to: "me@mydomain.com"
        from: "foopipes@mydomain.com"
        subject: "IT IS DOWN!"
        text: "It happened at: #{time}"
      - task: http
        url: "https://api.mailgun.net/v3/sandboxxxxx.mailgun.org/messages"
        method: post
        body: multipartFormUrlEncoded
        user: "api:key-xxxxxx"

If an error occurs or the service returns an empty json array, an email will be posted.

Email when someone pulls an image from Docker Hub

version: 2
services:
  scheduler:
    type: scheduler
    interval: "00:01:00"

pipelines:
  -
    when:
      - scheduler
    do:
      - http: "https://hub.docker.com/v2/repositories/aretera/foopipes/"
      - { where, value: "#{pull_count}", not: "#{file:pull_count}" }
      - { set: "file:pull_count", value: "#{pull_count}" }
    to:
      - publish: gotpull 
  - 
    when:
      - queue: gotpull
    do:
      - select
        to: me@mydomain.com"
        from: "foopipes@mydomain.com"
        subject: "Someone pulled"
        text: "Pull count is #{pull_count}"
      - http: "https://api.mailgun.net/v3/sandboxxxxx.mailgun.org/messages"
        method: post
        body: multipartFormUrlEncoded
        user: "api:key-xxxxxx"

Here we use file:pull_count for keeping state.

Corporate job adverts story

Publishing job adverts on a corporate website may seem like an easy task.

But what if the backend HR system is not up for it? Even if our website don't have problems to keep up with a 99.99% SLA, scaling backend systems to the required levels is costly if possible. The backend systems might not have all the functionallity you need anyway to provide a rich user experience.

This is an example of how to pull job adverts from the backend and store them into a searchable Elasticsearch database.

version: 2
plugins:
  - Elasticsearch

# Add a scheduler and Elasticsearch storage
services:
  scheduler:
    type: scheduler
    interval: "00:01:00" # Repeat every minute
  elasticsearch:
    type: elasticsearch
    url: "http://${elasticsearch|localhost}:9200"

pipelines:
  -
    when:
      - { service: scheduler } # Pipeline triggered by scheduler
    from:
      - { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}" } # Fetch jobs
    do:
      - { task: select, path: "jobs[*]" } # One entity for each job advert
    to: 
      - { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" }  # Store entities in Elasticsearch

Now you can easily free text search all company job adverts. http://localhost:9200/hr/_search?janitor

But as polling can cause delays, we'd like to expose a webhook that is called from the backend when a new job advert is published.

# Add webhook endpoint listener on path /webhook
services:
  mywebhook: 
    type: httplistener,
    path: webhook

pipelines:
  -
    when: 
      - { service: scheduler }
      - { service: mywebhook } # Trigger pipeline on webhook 
    from: 
      - { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}" }
    do:
      - { task: select, path: "jobs[*]" }
    to:
      - { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" }

Seldom the service's schema is what you desire for easy consumption, so you'd like to do some tricky data manipulation before you store the entities. Here we can transform each entity by invoking our own Node.js module.

pipelines:
  -
    when:
      - { service: scheduler }
      - { service: mywebhook }
    from:
      - { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}" }
    do:
      - { task: select, path: "jobs[*]" }
      - { task: node, module: mymodule, function: doCrazySchemaChanges } # Invoke Node module for each result in previous task
    to:
      - { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" }

As the number of job adverts increases, eventually it will be beneficial to get incremental changes from their updates-api. Here we can keep state by using Elasticsearch as a simple key-value storage.

pipelines:
  -
    when:
      - { service: scheduler }
      - { service: mywebhook }
    from:
      - { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}&token=#{elasticsearch:token}" } # Lookup "token" from Elasticsearch
    do:
      - { task: set, key: "elasticsearch:token", value: "#{nextToken}"} # Store received updated token in Elasticsearch
      - { task: select, path: "jobs[*]" }
      - { task: node, module: mymodule, function: doCrazySchemaChanges }
    to:
      - { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" }

The images stored with each job advert is raw full size photos, never intended to be published on a mobile website. So we need to scale the images.

Lets add a pipeline for image scaling.

TODO

Also create a Node.js module in Typescript like this: (plain es6 Javacript can also be used if prefered)

This example uses the well known Imagemagick together with a NPM package named gm. It reads from a stream, resizes the image and returns a ProcessStreamResult.

import * as foopipes from "foopipes";
import ICallContext = foopipes.ICallContext;
import IResult = foopipes.IResult;
import ProcessJsonResult = foopipes.ProcessJsonResult;
import ProcessStreamResult = foopipes.ProcessStreamResult;
import IStreamEvent = foopipes.IStreamEvent;

var gm = require('gm').subClass({imageMagick: true});

export function ImageResize(event: IStreamEvent, context: ICallContext): Promise<IResult>
{       
  return new Promise<IResult>((resolve, reject) => {
        const r = new ProcessStreamResult(context);

        // Invoke the 'gm' NPM module, and have it pipe the resulting image data back
        let transform = gm(event.stream,'img.png')
            .resize(50, 50);

        var str;
        if (context.metadata["image_url"].indexOf(".gif")>0)
            str = transform.stream("png");
        else
            str = transform.stream();

        str.pipe(r.stream).on('finish', ()=>resolve(r));
    });  
}