Marcel pipes streams of tuples between operators. Relational databases operate on tables, which are sets of tuples, and queries yield sets of tuples. This similarity makes it very easy to add database access to marcel.
You configure database access in the configuration file, ~/.marcel.py. For example:
define_db(name='mgdb', driver='psycopg2', dbname='megacorp', user='hr', password='workers_R_us')
This specifies a database profile named mgdb. It accesses a Postgres database using the psycopg2 driver, (only psycopg2 is supported for now). The Postgres database is named megacorp, and we login using the specified user name and password.
We could configure other profiles, to access other databases, and to connect as different users. We can specify a default profile, to avoid having to specify a profile on every database access:
DB_DEFAULT = 'mgdb'
Now that the database is configured, we can access the database using the sql operator. For example, to retrieve all rows from the Employee table:
sql 'select * from Employee'
The query yields all rows from the Employee table. These are turned into Python tuples by the database driver, then the sql operator writes these tuples into its output pipe. This allows the piping of database output to other marcel operators. E.g., we could pipe the rows to another operator which writes them to a file:
sql 'select * from Employee' \ | out --csv -f '/home/hr/Documents/all_emps.csv'
The out operator generates CSV-formatted output, and writes to the specified file.
You can also load data using marcel. For example, suppose that you want to record in a database, all processes running on your machine, every second, in a process table, with columns storing time (seconds since the epoch), process pid, and the process command line. You can add data to this table as follows:
timer 1 \ | map (t: (t, processes()) \ | expand 1 \ | map (t, p: (t, p.pid, p.commandline)) \ | sql --commit 1000 'insert into process(%s, %s, %s)'
timer 1: Generate a timestamp every 1 second.
map (t: (t, processes()): For each timestamp t, generate a tuple containing t and the list of all current processes.
expand 1: Expand each (t, [p1, p2, ...]) tuple into distinct tuples for each process, (t, p1), (t, p2), ...
map (t, p: ...): Map each (t, process) tuple into (t, process pid, process commandline).
sql ...: Insert a tuple containing time, pid and command line into the process table, committing after every 1000 rows.