Examples
Github Export
This is basically a git export using Github's API. It recursivly download all files in a Git repo starting from a path and store the files in current directory, keeping the directory structure intact.
Create a file foopipes.yml
like this:
version: 2
plugins:
services:
pipelines:
-
when:
- { service: queue, topic: started }
from:
- { task: httpget, url: "https://api.github.com/repos/AreteraAB/Foopipes.Examples/contents/Contentful" }
to:
- { task: publish, service: queue, topic: entry }
-
when:
- { service: queue, topic: entry }
from:
- { task: where, value: "#{type}", equal: "file" }
do:
- { task: set, key: "metadata:filename", value: "#{path}" }
- { task: httpget, url: "#{download_url}", format: binary }
to:
- { task: file, path: ".", filename: "#{metadata:filename}" }
-
when:
- { service: queue, topic: entry }
from:
- { task: where, value: "#{type}", equal: "dir" }
do:
- { task: httpget, url: "#{url}"}
to:
- { task: publish, service: queue, topic: entry }
Start with:
docker run -it -v $(pwd):/project aretera/foopipes
Windows:
docker run -it -v %CD%:/project aretera/foopipes
How it works
At startup an initial request is done to the api and each entry in the repository is put into a queue with topic entry
. Two pipelines are configured to process each entry in the queue, one for files and one for directories.
The Api returns json similar to this: (shortened for brevity)
[
{
"name": "Readme.md",
"path": "Contentful/Readme.md",
"sha": "1dbc524a326ab5a2bc1b4e84cc247256702be01f",
"url": "https://api.github.com/repos/AreteraAB/Foopipes.Examples/contents/Contentful/Readme.md?ref=master",
"download_url": "https://raw.githubusercontent.com/AreteraAB/Foopipes.Examples/master/Contentful/Readme.md",
"type": "file"
},
....
]
Data binding expressions like #{download_url}
are used for performing tasks on each entry.
If it the entry is a file it is downloaded and stored to disk. If it is a directory, the directory´s entries are put on the queue.
We use a metadata placeholder for the path and filename which is inherited down in the pipeline to the remaining tasks and used in the to section when storing the downloaded file to disk.
Note: The Github api has rate restrictions for unauthenticated users (currenly 60 requests per hour). When this limit is reached you'll get a 403 Forbidden.
Super simple service monitoring
Create a free Mailgun account and create a foopipes.yml
similar to this:
services:
scheduler:
type: scheduler
interval: "00:01:00"
http:
type: http
timeout: "00:00:05"
pipelines:
-
when:
- { service: scheduler }
do:
- { task: httpget, url: "https://jsonplaceholder.typicode.com/posts" }
then:
- { task: publish, topic: serverdown, empty: true }
error:
- { task: publish, topic: serverdown }
-
when:
- { service: queue, topic: serverdown }
do:
- task: select
to: "me@mydomain.com"
from: "foopipes@mydomain.com"
subject: "IT IS DOWN!"
text: "It happened at: #{time}"
- task: http
url: "https://api.mailgun.net/v3/sandboxxxxx.mailgun.org/messages"
method: post
body: multipartFormUrlEncoded
user: "api:key-xxxxxx"
If an error occurs or the service returns an empty json array, an email will be posted.
Email when someone pulls an image from Docker Hub
version: 2
services:
scheduler:
type: scheduler
interval: "00:01:00"
pipelines:
-
when:
- scheduler
do:
- http: "https://hub.docker.com/v2/repositories/aretera/foopipes/"
- { where, value: "#{pull_count}", not: "#{file:pull_count}" }
- { set: "file:pull_count", value: "#{pull_count}" }
to:
- publish: gotpull
-
when:
- queue: gotpull
do:
- select
to: me@mydomain.com"
from: "foopipes@mydomain.com"
subject: "Someone pulled"
text: "Pull count is #{pull_count}"
- http: "https://api.mailgun.net/v3/sandboxxxxx.mailgun.org/messages"
method: post
body: multipartFormUrlEncoded
user: "api:key-xxxxxx"
Here we use file:pull_count
for keeping state.
Corporate job adverts story
Publishing job adverts on a corporate website may seem like an easy task.
But what if the backend HR system is not up for it? Even if our website don't have problems to keep up with a 99.99% SLA, scaling backend systems to the required levels is costly if possible. The backend systems might not have all the functionallity you need anyway to provide a rich user experience.
This is an example of how to pull job adverts from the backend and store them into a searchable Elasticsearch database.
version: 2
plugins:
- Elasticsearch
# Add a scheduler and Elasticsearch storage
services:
scheduler:
type: scheduler
interval: "00:01:00" # Repeat every minute
elasticsearch:
type: elasticsearch
url: "http://${elasticsearch|localhost}:9200"
pipelines:
-
when:
- { service: scheduler } # Pipeline triggered by scheduler
from:
- { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}" } # Fetch jobs
do:
- { task: select, path: "jobs[*]" } # One entity for each job advert
to:
- { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" } # Store entities in Elasticsearch
Now you can easily free text search all company job adverts. http://localhost:9200/hr/_search?janitor
But as polling can cause delays, we'd like to expose a webhook that is called from the backend when a new job advert is published.
# Add webhook endpoint listener on path /webhook
services:
mywebhook:
type: httplistener,
path: webhook
pipelines:
-
when:
- { service: scheduler }
- { service: mywebhook } # Trigger pipeline on webhook
from:
- { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}" }
do:
- { task: select, path: "jobs[*]" }
to:
- { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" }
Seldom the service's schema is what you desire for easy consumption, so you'd like to do some tricky data manipulation before you store the entities. Here we can transform each entity by invoking our own Node.js module.
pipelines:
-
when:
- { service: scheduler }
- { service: mywebhook }
from:
- { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}" }
do:
- { task: select, path: "jobs[*]" }
- { task: node, module: mymodule, function: doCrazySchemaChanges } # Invoke Node module for each result in previous task
to:
- { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" }
As the number of job adverts increases, eventually it will be beneficial to get incremental changes from their updates-api. Here we can keep state by using Elasticsearch as a simple key-value storage.
pipelines:
-
when:
- { service: scheduler }
- { service: mywebhook }
from:
- { task: httpget, url: "https://myhrsystem/api/job?p=${CLIENT_KEY}&token=#{elasticsearch:token}" } # Lookup "token" from Elasticsearch
do:
- { task: set, key: "elasticsearch:token", value: "#{nextToken}"} # Store received updated token in Elasticsearch
- { task: select, path: "jobs[*]" }
- { task: node, module: mymodule, function: doCrazySchemaChanges }
to:
- { task: store, service: elasticsearch, index: hr, dataType: job, key: "#{assignment_id}" }
The images stored with each job advert is raw full size photos, never intended to be published on a mobile website. So we need to scale the images.
Lets add a pipeline for image scaling.
TODO
Also create a Node.js module in Typescript like this: (plain es6 Javacript can also be used if prefered)
This example uses the well known Imagemagick together with a NPM package named gm
. It reads from a stream, resizes the image and returns a ProcessStreamResult
.
import * as foopipes from "foopipes";
import ICallContext = foopipes.ICallContext;
import IResult = foopipes.IResult;
import ProcessJsonResult = foopipes.ProcessJsonResult;
import ProcessStreamResult = foopipes.ProcessStreamResult;
import IStreamEvent = foopipes.IStreamEvent;
var gm = require('gm').subClass({imageMagick: true});
export function ImageResize(event: IStreamEvent, context: ICallContext): Promise<IResult>
{
return new Promise<IResult>((resolve, reject) => {
const r = new ProcessStreamResult(context);
// Invoke the 'gm' NPM module, and have it pipe the resulting image data back
let transform = gm(event.stream,'img.png')
.resize(50, 50);
var str;
if (context.metadata["image_url"].indexOf(".gif")>0)
str = transform.stream("png");
else
str = transform.stream();
str.pipe(r.stream).on('finish', ()=>resolve(r));
});
}