Friday, December 27, 2013

Amtrak real-time train data

Recently the US passenger rail operator Amtrak announced a nationwide real-time map of it's services.

That's great...until you try it.

The United States is big, even though the number of trains is fairly small. The number of Google map tiles is big. The polygons that show the routes are enormous. The nationwide train data is big.

That's a lot of data to load --and so slow to display-- if all I really want is realtime data on a single train (is my train late?).
 
After looking at the GETs emitted by all the (slow, unnecessary) javascript on that site, I figured out how to bypass it and pull just the table of current train status. It's 400kb, but it's a lot less than the over 1MB to go through the website and map....

https://www.googleapis.com/mapsengine/v1/tables/01382379791355219452-08584582962951999356/
features?version=published&key=AIzaSyCVFeFQrtk-ywrUE0pEcvlwgCqS6TJcOW4&maxResults=250&
callback=jQuery19106722136430546803_1388112103417&dataType=jsonp&
jsonpCallback=retrieveTrainsData&contentType=application%2Fjson&_=1388112103419


The only fields required are 'version' and 'key' (which is different from the site cookie):

https://www.googleapis.com/mapsengine/v1/tables/01382379791355219452-08584582962951999356/
features?version=published&key=AIzaSyCVFeFQrtk-ywrUE0pEcvlwgCqS6TJcOW4

So we can easily download the current status table using wget:

$ wget -O train_status https://www.googleapis.com/mapsengine/v1/tables/
01382379791355219452-08584582962951999356/features?version=published\&
key=AIzaSyCVFeFQrtk-ywrUE0pEcvlwgCqS6TJcOW4\&jsonpCallback=retrieveTrainsData


Since parsing 400kb of JSON using shell script will be awkward and annoying, let's change over to Python3:

#/usr/bin/python3
import httplib2
url   = \
"""https://www.googleapis.com/mapsengine/v1/table
/01382379791355219452-08584582962951999356/features?version=published&
key=AIzaSyCVFeFQrtk-ywrUE0pEcvlwgCqS6TJcOW4&jsonpCallback=retrieveTrainsData"""

h = httplib2.Http(None)
resp, content = h.request(url, "GET")

Let's tell Python that the content is a JSON string, and iterate through the list of trains looking for train number 7. That's a handy train because each run takes over 24 hours - it will always have a status.

import json
all_trains = json.loads(content.decode("UTF-8"))
for train in all_trains["features"]:
   if train["properties"]["TrainNum"] == "7":
       do something

After locating the train we care about, here is data about it:

latitude     = train["geometry"]["coordinates"][0]
longitude    = train["geometry"]["coordinates"][1]
speed        = train["properties"]["Velocity"]
report_time  = train["properties"]["LastValTS"]
next_station = train["properties"]["EventCode"]


Let's find a station (FAR) along the route. This is a little tougher, because Python's JSON module reads each Station as a string instead of a dict. We need to identify Station lines, and feed those through the JSON parser (again).

for prop in train["properties"].keys():
          if "Station" in prop:
              sta = json.loads(train["properties"][prop])
              if sta["code"] == "FAR":
                  # Schedule data
                  station_time_zone        = sta['tz']
                  scheduled_arrival_time   = sta['scharr'] # Only if a long stop
                  scheduled_departure_time = sta['schdep'] # All stations

                  # Past stations
                  actual_arrival_time      = sta['postarr']
                  actual_departure_time    = sta['postdep']
                  past_station_status      = sta['postcmnt']

                  # Future stations
                  estimated_arrival_time   = sta['estarr']
                  future_station_status    = sta['estarrcmnt']

The final Python script for Train 7 at FAR is:

#/usr/bin/python3

import httplib2
import json

url = "https://www.googleapis.com/mapsengine/v1/tables/01382379791355219452-08584582962951999356/features?version=published&key=AIzaSyCVFeFQrtk-ywrUE0pEcvlwgCqS6TJcOW4&jsonpCallback=retrieveTrainsData"
h = httplib2.Http(None)
resp, content = h.request(url, "GET")

all_trains = json.loads(content.decode("UTF-8"))
for train in all_trains["features"]:
   if train["properties"]["TrainNum"] == "7":
      for prop in train["properties"].keys():
          if "Station" in prop:
              sta = json.loads(train["properties"][prop])
              if sta["code"] == "FAR":
                  print('Train number {}'.format(train["properties"]["TrainNum"]))
                  if 'schdep' in sta.keys():
                      print('Scheduled: {}'.format(sta['schdep']))
                  if 'postcmnt' in sta.keys():
                      print("Status: {}".format(sta['postcmnt']))
                  if 'estarrcmnt' in sta.keys():
                      print("Status: {}".format(sta['estarrcmnt']))

And the result looks rather like:

$ python3 Current\ train\ status.py 

Train number 7
Scheduled: 12/26/2013 03:35:00
Status: 1 HR 4 MI LATE

Train number 7
Scheduled: 12/27/2013 03:35:00




Two more items.

First, the Python script can be tweaked to show *all* trains for a station. Just remove the train number filter. To be useful, you might add some time comprehension, since some scheduled times can be a day in the future or past.

Second, the station information table has a similar static page: https://www.googleapis.com/mapsengine/v1/tables/01382379791355219452-17620014524089761219/features?&version=published&maxResults=1000&key=AIzaSyCVFeFQrtk-ywrUE0pEcvlwgCqS6TJcOW4&callback=jQuery19106722136430546803_1388112103417&dataType=jsonp&jsonpCallback=retrieveStationData&contentType=application%2Fjson&_=13881121034

As with the train status JSON application, many of the fields are dummies. This shorter URL works, too: https://www.googleapis.com/mapsengine/v1/tables/01382379791355219452-17620014524089761219/features?&version=published&key=AIzaSyCVFeFQrtk-ywrUE0pEcvlwgCqS6TJcOW4&callback=jQuery19106722136430546803_1388112103417




Sunday, December 22, 2013

Python http.server and upstart-socket-bridge

In a previous post, I showed how to make the Python 3 socketserver.TCPServer class compatible with upstart-socket-bridge by overriding the server_bind() and server_activate() methods.

Python's http.server module builds on socketserver. Let's see if we can similarly make http.server compatible with upstart-socket-bridge.

About http.server

The http.server module is intended to be a simple way to create webservers. Most of the module is devoted to classes that handle incoming and outgoing data. Only one class, HTTPServer, handles the networking stuff.




Example 1

Here is the first example http.server.SimpleHTTPRequestHandler in the Python documentation:

import http.server
import socketserver

PORT = 8000

Handler = http.server.SimpleHTTPRequestHandler

httpd = socketserver.TCPServer(("", PORT), Handler)

print("serving at port", PORT)
httpd.serve_forever()

When you run this code, and point a web browser to port 8000, the code serves the current working directory, with links to files and subdirectories.


Convert Example 1 to work with upstart-socket-bridge

The example server is only seven lines.

Line 3, the PORT, is no longer needed. Upstart will define the port.

Line 5, the socketserver.TCPServer line, will be the biggest change. We need to define a new class based on TCPServer, and override two methods. The is exactly what we did in the previous post.

Line 6, the print statement, can be deleted. When the job is started by Upstart, there is no display - nowhere to print to. An important side effect of starting the job using Upstart is that the Present Working Directory is / (root), unless you specify otherwise in the /etc/init config file that starts the job.

Because of the change to Line 5, the final result is 6 more lines...not bad, and now it works with upstart-socket-bridge.

import http.server
import socketserver

class MyServer(socketserver.TCPServer):
    def server_bind(self):
        """ Replace the socket FD with the Upstart-provided FD"""
        fd = int(os.environ["UPSTART_FDS"])
        self.socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)

    def server_activate(self):
        pass            # Upstart takes care of listen()

Handler = http.server.SimpleHTTPRequestHandler
server = MyServer(None, Handler)
server.serve_forever()

As always, the script is triggered by an appropriate Upstart job.

Test the script, in this case, by pointing a web browser at the port specified in the Upstart job.




Example 2


This example is also from the Python http.server documentation:

def run(server_class=HTTPServer, handler_class=BaseHTTPRequestHandler):
    server_address = ('', 8000)
    httpd = server_class(server_address, handler_class)
    httpd.serve_forever()

It's not a standalone example, but instead an abstract example of how to use the module in a larger application.

Let's reformat it into a working example:

import http.server
server_address = ('', 8000)
handler_class=http.server.BaseHTTPRequestHandler
httpd = http.server.HTTPServer(server_address, handler_class)
httpd.serve_forever()

Now the server runs...sort of. It gives us a 501 error message. Let's add a class to the handler that reads the path and gives a real, valid response. (Source)

import http.server

class MyHandler(http.server.BaseHTTPRequestHandler):
    def do_HEAD(self):
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
    def do_GET(self):
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        print(self.wfile)
        content = ["<html><head><title>The Title</title></head>",
                   "<body>This is a test.<br />",
                   "You accessed path: ", self.path, "<br />",
                   "</body></html>"]
        self.wfile.write("".join(content).encode("UTF-8"))


server_address = ('', 8000)
handler_class=MyHandler
httpd = http.server.HTTPServer(server_address, handler_class)
httpd.serve_forever()

Okay, now this is a good working example of a working http.server. It really shows how http.server hides all the complexity of networking on one simple .server line and focuses all your effort on content in the handler. It's clear how you would parse the URL input using self.path. It's clear how you can create and send content using self.wfile.

Convert Example #2 to work with upstart-socket-bridge

There are two classes at work: http.server.BaseHTTPRequestHandler handles content doesn't care about the networking. http.server.HTTPServer handles netwokring and doesn't care about content.

Good news: HTTPServer is based on socketserver.TCPServer, which we already know how to patch!

The changes I made:

1) I want it to launch at each connection, exchange data once, and then terminate. Each connection will launch a separate instance. We no longer want the service to serve_forever().

httpd.serve_forever()    # Old
httpd.handle_request()   # New



2) Let's make it compatible with all three inits: sysvinit (deamon), Upstart, and systemd.

import os

if __name__ == "__main__":
    if "UPSTART_FDS" in os.environ.keys() \   # Upstart
    or "LISTEN_FDS" in os.environ.keys():     # systemd
        httpd = MyServer(None, MyHandler)     # Need a custom Server class
        httpd.handle_request()                # Run once
    else:                                     # sysvinit
        server_address = ('', 8000)
        httpd = http.server.HTTPServer(server_address, MyHandler)
        httpd.serve_forever()                 # Run forever



3) Add a custom handler that overrides server_bind() and server_activate() in both the socketserver and http.server.HTTPServer modules. This is the secret sauce that makes Upstart compatibility work:

import http.server, socketserver, socket, os

class MyServer(http.server.HTTPServer):
    def server_bind(self):
        # Get the File Descriptor from an Upstart-created environment variable
        if "UPSTART_FDS" in os.environ:
            fd = int(os.environ["UPSTART_FDS"])      # Upstart
        else:
            fd = int(os.environ["LISTEN_FDS"])       # Systemd
        self.socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)

        # Only http.server.CGIHTTPRequestHandler uses these
        host, port = self.socket.getsockname()[:2]
        self.server_name = socket.getfqdn(host)
        self.server_port = port



4) Add the Upstart job to monitor the port:

# /etc/init/socket-test.py
description "upstart-socket-bridge test"
start on socket PROTO=inet PORT=8000 ADDR=127.0.0.1
setuid your_username               # Does not need to run as root
exec /usr/bin/python3 /tmp/socket-server.py



And the final product looks like:

#!/usr/bin/python3

import http.server, socketserver, socket, os

class MyServer(http.server.HTTPServer):
    """
    This class overrides two methods in the socketserver module:
        socketserver __init__ uses both server_bind() and server_activate()
    This class overrides one method in the http.server.HTTPServer class:
        HTTPServer uses both socketserver __init__ and it's own custom
        server_bind
    These overrides makes it compatible with Upstart and systemd socket-bridges
    Warning: It won't bind() or listen() to a socket anymore
    """
    def server_bind(self):
        """
        Get the File Descriptor from an Upstart-created environment variable
        instead of binding or listening to a socket.
        """
        if "UPSTART_FDS" in os.environ:
            fd = int(os.environ["UPSTART_FDS"])
        else:
            fd = int(os.environ["LISTEN_FDS"])
        self.socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)

        # From http.server:
        # http.server.CGIHTTPRequestHandler uses these.
        # Other handler classes don't use these.
        host, port = self.socket.getsockname()[:2]
        self.server_name = socket.getfqdn(host)
        self.server_port = port


    def server_activate(self):
        """
        This socketserver method sends listen(), so it needs to be overridden
        """
        pass


class MyHandler(http.server.BaseHTTPRequestHandler):
    """
    A very simple custom handler.
    It merely reads the URL and responds with the path.
    This shows how you read data from a GET, and send a response. 
    """
    def do_HEAD(self):
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
    def do_GET(self):
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        print(self.wfile)
        content = ["The Title",
                   "This is a test.
",
                   "You accessed path: ", self.path, "
",
                   ""]
        self.wfile.write("".join(content).encode("UTF-8"))


if __name__ == "__main__":
    if "UPSTART_FDS" in os.environ.keys() \     # Upstart
    or "LISTEN_FDS" in os.environ.keys():       # systemd
        httpd = MyServer(None, MyHandler)       # Use fd to get connection
        httpd.handle_request()                  # Handle once, then terminate
    else:
        server_address = ('', 8000)             # sysvinit, classic bind()
        httpd = http.server.HTTPServer(server_address, MyHandler)
        httpd.serve_forever()


Test the service:

  • The Python 3 script and Upstart job both use Port 8000.
  • Save the Python 3 script. Make it executable.
  • Save the Upstart job. Change the Upstart job to Port 8001. Make sure it points to the Python 3 script.

Webserver daemon using sysvinit - run forever:
  • Run the Python 3 script.
  • Start a web browser, and point it to http://localhost:8000/test/string
  • The browser should show a response
  • Kill the Python 3 script

Web service using upstart - run once:

  • Don't start the Python 3 script. Upstart will do it for you.
  • Start a web browser, and point it to http://localhost:8001/test/string
  • The browser should show a response.








Tuesday, December 10, 2013

Python socketserver and upstart-socket-bridge

In a previous post, I described how to use Upstart to monitor a port, trigger a Python script upon connection, and then let the Python application send and receive data over that port.

In a server application, Upstart can take over the bind() and listen() elements of the server daemon, sometimes eliminating the need for a daemon at all.

Here is an example of a Python socketserver, and how it changes when using Upstart.

Some complexity is reduced: The handler is simpler, it no longer needs to fork() or serve_forever(). Each connection triggers the application, so the application need only handle a single read()/write() cycle, then terminate.

Some complexity is increased: We need to override two methods of a socketserver class to make it upstart-socket-bridge compatible.




Here is the original socketserver.TCPServer serve_forever example, from the Python documentation:

import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):
    """
    The RequestHandler class for our server.

    It is instantiated once per connection to the server, and must
    override the handle() method to implement communication to the
    client.
    """

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        # just send back the same data, but upper-cased
        self.request.sendall(self.data.upper())

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999

    # Create the server, binding to localhost on port 9999
    server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)

    # Activate the server; this will keep running until you
    # interrupt the program with Ctrl-C
    server.serve_forever()


It's easy to test. Let's save the file as /tmp/socket-server-original.py

$ python3 /tmp/socket-server-original.py     # On Terminal #1

$ echo "Test String" | nc localhost 9999     # On Terminal #2
TEST STRING

The example works.




Now, let's adapt it for upstart-socket-bridge.

It takes a bit of reading through the socketserver source code to figure it out:

TCPServer's __init__ calls good old bind() and listen() using it's own methods server_bind() and server_activate(), respectively.

Both of those methods can be overridden. Better yet, they use socket module calls (socket.bind() and socket.listen()) that we already know how to replace with socket.fromfd().

So we can get TCPServer to work by overriding the two methods like this:

import os, socket

class MyServer(socketserver.TCPServer):
    """
    This class overrides two method in socketserver.TCPServer 
    and makes it Upstart-compatible
    """
    def server_bind(self):
        """ Replace the socket-created file descriptor (fd) with the Upstart-provided fd"""
        fd = int(os.environ["UPSTART_FDS"]
        self.socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
def server_activate(self): """This means listen(), but we don't want it do do anything""" pass


Changing the TCPServer class slightly changes the way way we activate.
We must call the new class instead of TCPServer, and we no longer need the address since bind() no longer does anything:

    server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)  # OLD
    server = MyServer(None, MyTCPHandler)                        # NEW


The code that handles the request, generates the response, and sends the response has two changes. Printing on the terminal won't work, since Upstart doesn't use a terminal to start the process. So we'll delete the printing lines:

        print("{} wrote:".format(self.client_address[0]))
        print(self.data)





Finally, one more change. The script can quit after the response has been sent:

    server.serve_forever()      # OLD
    server.handle_request()     # NEW





When we put it all together, here is the new version that works with upstart-socket-bridge:

import socketserver, os, socket

class MyServer(socketserver.TCPServer):
    """
    This class overrides two method in socketserver.TCPServer 
    and makes it Upstart-compatible
    """
    def server_bind(self):
        """ Replace the socket FD with the Upstart-provided FD"""
        fd = int(os.environ["UPSTART_FDS"])
        self.socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)

    def server_activate(self):
        pass            # Upstart takes care of listen()


class MyTCPHandler(socketserver.BaseRequestHandler):
    """
    The RequestHandler class for our server.

    It is instantiated once per connection to the server, and must
    override the handle() method to implement communication to the
    client.
    """

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()

        # just send back the same data, but upper-cased
        self.request.sendall(self.data.upper())

if __name__ == "__main__":
    # Create the server using the file descriptor from Upstart
    server = MyServer(None, MyTCPHandler)

    # Run once, then terminate
    server.handle_request()


Let's save the file as /tmp/socket-server-new.py
We'll need to adapt (or create) /etc/init/socket-test.conf Upstart job file.

description "upstart-socket-bridge test"
start on socket PROTO=inet PORT=34567 ADDR=127.0.0.1
setuid myusername   # Use *your* user name. Doesn't need to be root
exec /usr/bin/python3 /tmp/socket-server-new.py

And let's give it a try on the terminals:

$ python3 /tmp/socket-server-new.py           # On Terminal #1

$ echo "Test String" | nc localhost 34567     # On Terminal #2
TEST STRING

Overriding socketserver for upstart compatibility works.




After this test, cleanup is easy

$ sudo rm /etc/init/socket-test.conf
$ rm /tmp/socket-server-original.py /tmp/socket-server-new.py

Monday, December 2, 2013

upstart-socket-bridge

Upstart-socket-bridge is a lot like xinetd. They replace the need for some daemons by monitoring a port, and then launching the desired application when an inbound connection is detected. U-s-b is part of a family of Upstart services that replace many daemon monitoring and listening functions and hooks.

Unlike xinetd, services need to be customized (patched) to run with upstart-socket-bridge.

Documentation is quite sparse. Hence this blog post. That's not intended to criticize; it's really hard to write "good" documentation when you don't know the use cases or the experience level of the user. If you have experience with writing sockets in C, and understand what a file descriptor is and how to use one,  then the documentation is just fine. I didn't before I began this odyssey.




How do I make it work?

Here are three simple examples of how it works.
One uses shell script.
One uses Python3.
One uses C.



Hello, World! with shell script


The script that gets triggered by the port action. The port is just a trigger, no data gets exchanged on the port.

1) Let's create a shell script called test-script. This script merely prints out the Upstart-related environment variables into a file.

#!/bin/sh
outfile=/tmp/outfile
date > $outfile            # Timestamp
printenv | grep UPSTART >> $outfile
exit 0


2)  Create an Upstart .conf, let's call it /etc/init/socket-test.conf

description "upstart-socket-bridge test"
start on socket PROTO=inet PORT=34567 ADDR=127.0.0.1  # Port 34567
setuid exampleuser                                    # Run as exampleuser, not root
exec /bin/sh /tmp/test-script                         # Launch the service


3)  Let's run it. Connect to the port using netcat.

$ nc localhost 34567
^C       # End the process using CTRL+C


4)  Look at the result. Hey, look, it's all the environment variables we need!

$ cat /tmp/outfile


5)  Clean up:

$sudo rm /etc/init/socket-test.conf           # Disconnect the launch trigger
$rm /tmp/test-script /tmp/outfile             # Delete the test service





"Hello, World" service in Python 3

(UPDATED: Thanks to Dmitrijs Ledkovs for getting this to work!)

It's a simple echo server - the Python version of the C service below. It requires two files, the application and the Upstart .conf. It demonstrates how a service reads uses the port connection for a trigger and exchanging data.


1) Let's create the Python 3 file. Let's call it test-service.py

#!/usr/bin/python3
import os, socket

# Create the socket file descriptor from the the env var
sock_fd = socket.fromfd(int(os.environ["UPSTART_FDS"]),
                        socket.AF_INET, socket.SOCK_STREAM)

# Accept the connection, create a connection file descriptor
conn, addr = sock_fd.accept()

# Read
message = conn.recv(1024).decode('UTF-8')

# Manipulate data
reply = ("I got your message: " + message)

# Write
conn.send(reply.encode('UTF-8'))

# Finish
conn.close()



2)  Create an Upstart .conf, let's call it /etc/init/socket-test.conf

description "upstart-socket-bridge test"
start on socket PROTO=inet PORT=34567 ADDR=127.0.0.1  # Port 34567
setuid exampleuser                                    # Run as exampleuser, not root
exec /usr/bin/python3 /tmp/test-service.py            # Launch the service


3) Let's run it. Connect to the port using netcat, and then type in a string.

$ nc localhost 34567
Hello, World!                       # You type this in. Server read()s it.
I got your message: Hello, World!   # Server response.  Server write()s it.


4) Cleanup is simple. Simply delete the two files.

$ sudo rm /etc/init/socket-test.conf         # Disconnect the bridge
$ rm /tmp/test-service.py                    # Delete the test service







"Hello, World!" service in C


It's a simple echo server - the C version of the Python service above. It requires two files, the application and the Upstart .conf. It demonstrates how a service reads uses the port connection for a trigger and exchanging data.

1)  Let's create a C file. Let's call it test-service.c

#include <stdlib.h>
#include <netinet/in.h>
#include <string.h>

int main()
{
    /* Read the UPSTART_FDS env var to get the socket fd */
    char *name = "UPSTART_FDS";
    char *env = getenv (name);       // Read the environment variable
    int sock_fd = atoi(env);         // Socket file descriptor

    /* Don't need to do any of these normal socket tasks! Hooray!
    / int port_num;           
    / int sock_fd = socket(AF_INET, SOCK_STREAM, 0);  
    / memset((char *) &serv_addr, 0, sizeof(serv_addr));
    / serv_addr.sin_family = AF_INET;
    / serv_addr.sin_addr.s_addr = INADDR_ANY;
    / serv_addr.sin_port = htons(port_num);
    / struct sockaddr_in serv_addr
    / bind(sock_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
    / listen(sock_fd, 5)                                                 
    */

    /* Accept() the connection. Returns the second fd: 'conn_fd' */
    struct sockaddr_in cli_addr;   // Requires netinet/in.h
    int clilen = sizeof(cli_addr);
    int conn_fd = accept(sock_fd, (struct sockaddr *) &cli_addr, &clilen);

    /* Service is active. Read-from and write-to the connection fd */
    char response[276] = "I got your message: ";
    char buffer[256];
    memset((char *) &buffer, 0, sizeof(buffer));  
    read(conn_fd, buffer, 256);                   // Read from conn_fd
    strcat(response, buffer);                     
    write(conn_fd, response, strlen(response));   // Write to conn_fd

    /* Close the connection fd. Socket fd can be reused */
    close(conn_fd);
    return 0;
}

2)  Compile it using gcc, and output the compiled application as an executable called test-service. I put mine in /tmp to make cleanup easier. If you're familiar with gcc, the important element is that there are no flags and no libraries:

gcc -o test-service test-service.c


3)  Create an Upstart .conf, let's call it /etc/init/socket-test.conf

description "upstart-socket-bridge test"
start on socket PROTO=inet PORT=34567 ADDR=127.0.0.1  # Port 34567
setuid exampleuser                                    # Run as exampleuser, not root
exec /tmp/test-service                                # Launch the service


4) Let's run it. Connect to the port using netcat, and then type in a string.

$ nc localhost 34567
Hello, World!                       # You type this in. Server read()s it.
I got your message: Hello, World!   # Server response.  Server write()s it.


5) Cleanup is simple. Simply delete the three files.

$ sudo rm /etc/init/socket-test.conf         # Disconnect the bridge
$ rm /tmp/test-service.c /tmp/test/service   # Delete the test service



How does it work?

Here is the oversimplified explanation. Each stream of data whizzing round inside your system is tracked by the kernel. That tracking, sort of like an index or a pointer, is called a file descriptor (fd). A few fds are reserved (0=stdin, 1=stdout, 2=stderr) and you run into these in shell scripting or cron jobs.

A pipe or port or socket is just a way to tell the kernel that a stream of data output from Application A should be input to Application B. Let's look at it another way, and add that fd definition: An fd identifies a data stream output from A and input to B. The pipe/socket/port is a way to express how you want the fd set up.

Now the gritty stuff: A socket/port can have a single server and multiple clients attached. The server bind()s the port, and listen()s on the port for connections, and accept()s each connection from a client. Each connection to a client gets issues another file descriptor.

That's two file descriptors: The first defines the port/socket in general, and the second defines the specific connection between one client and the server.

Upstart tells your server application (since it's not actually serving, let's just call it the "service") the first file descriptor.
  • Your service doesn't start a daemon at boot.
  • Your service doesn't bind() to the socket/port. Upstart does that.
  • Your service doesn't listen() on the socket/port. Upstart does that.
  • Your service doesn't fork() for each connection. Upstart launches an instance of your service for each connection. Your service can terminate when the connection ends...if you want it to.
  • Your service does accept() the connection from a client, communicate using the resulting file descriptor, and end when the connection close()s.

Let's try it with the example above:
  1. Upstart and Service are running on Server. Client is running somewhere else - maybe it's also running on Server, maybe it's out on the network somewhere.
  2. The file /etc/init/socket-test.conf tells Upstart to monitor port #34567 on behalf of test-service application. As currently written, it will begin monitoring at boot and stop monitoring at shutdown.
  3. When Client --like netcat-- connect()s to port #34567, Upstart launches test-service application with a couple extra environment variables.
  4. test-service reads the environment variables, including the file descriptor (fd).
  5. test-service accept()s the connection on the file descriptor. This creates a second fd that Service can use to communicate.
  6. When Client and test-service are done communicating, they close() the connection fd.
  7. test-service can end. Upstart will restart it next time an inbound connection is received.




How do I make it work with a service I didn't write? (like my favorite Game Server or Media Server or Backup Server)

Maybe it will work, maybe it won't. There are a couple issues to consider. I don't see an easy, non-coding solution because we're talking about changing the nature of these services.
  • Change from always-on to sometimes-on.
  • Change to (save and) quit when the connection is done instead of listen()ing for another connection. I don't see any upstart trigger for a socket connection ending.
  • Might make some service code simpler. No longer need to fork() or bind().
  • Not portable to non-Upstart systems, so the daemon code remains. Adds a bit to code complexity and a new testing case.
  • A different trigger (hardware, file change, etc) might be a better trigger than a connection to a socket. Upstart has bridges for those, too.

Sunday, December 1, 2013

Handy Pipe and Socket Reference

I'm not going to explain sockets. There are too many good tutorials out there already.
Instead, I'm going to use sockets in a bunch of examples that build.

Ordinary Pipes
Named Pipes/Sockets
Unix Domain Sockets
TCP Sockets/Ports
upstart-socket-bridge


The Simplest


Pipes (|) and redirection (>). We already know how to do that in the shell.
These pipes connect related processes. For example, the children of a single terminal are related.

It's hard to pipe from shell to python (or the reverse) - it's possible, just not easy the first couple times you try.

Shell example:
$ echo Hello, Planet\! | sed 's/Planet/World/'
Hello, World!

Here is a good example of pipes in Python 3.




Named Pipes


Slightly more complicated (reference). These use a pipe file that synchronizes the data transfer when both processes are ready. A -> Pipe -> B. This is handy for inter-process communication that does not need to run through dbus, and saves a bit of overhead.

Names pipes are a shared file. They can have many (anonymous) writers, but only a single reader. Shell, Python, and C senders and listeners can interoperate (fairly) smoothly.

Named pipes are blocking. Start the reader first, and it will wait for the writer. Alternately, start the writer first, and it will wait for the reader. Once the writer has finished, the reader will also stop. One read() for one write(). A second read() requires a second write()...and a second write() requires a second read().

Named pipes in shell and C can be reused. Named pipes in Python cannot be reused after close() - they must be unlink() (destroyed) and remade afresh.

Conventionally, the listener is expected to create the pipe. But it's not required.

Processes that use named pipes work in the following order:
  • Create the pipe (usually done by listener)
  • Open the pipe (both sides)
  • Write the pipe
  • Writer close the pipe (allows the reader to see the data)
  • Listener read the pipe
  • Listener close the pipe
  • Delete the pipe (usually done by the listener)


Creating and Deleting a Named Pipe:



Shell:

#!/bin/sh
mkfifo /tmp/testpipe        # Create pipe
rm /tmp/testpipe            # Delete pipe

Python3:

>>> import os
>>> os.mkfifo("/tmp/testpipe")    # Create pipe
>>> os.remove("/tmp/testpipe")    # Delete pipe 

C:

# include <sys/stat.h>
int main( ){
   int result = mkfifo ("/tmp/testpipe", S_IRUSR| S_IWUSR);  \\ Create Pipe
   unlink ("/tmp/testpipe");                                 \\ Delete Pipe
   return 0;
   }


Named Pipe Listeners:

 

Shell:

#!/bin/sh
pipe=/tmp/testpipe
trap "rm -f $pipe" EXIT
while true
do
    read line <$pipe   # Read the pipe
    echo $line
done

Python3:

>>> import os
>>> pipe = open("/tmp/testpipe", "r")
>>> pipe.read()     # You won't read anything until the sender close()
'Hello, World!'     # Data from pipe
>>> pipe.close()

C:

#include <fcntl.h>
#include <stdio.h>
int main()
{
    char buf[20];
    int fd = open("/tmp/testpipe", O_RDONLY);  // Open the pipe
    read(fd, buf, 20);                         // Read the message - unblock the writing process
    printf("Received: %s\n", buf);             // Display the message
    close(fd);                                 // Close the pipe
    return 0;
}


Named Pipe Senders:


Shell:

#!/bin/sh
pipe=/tmp/testpipe
[ ! -p $pipe ] && echo "Reader not running" && exit 1
[ "$1" ] && echo "$1" >$pipe || echo "Hello from $$" >$pipe

Python3:

>>> import os
>>> pipe = open("/tmp/testpipe", "w") # Hang here awaiting the receiver to read() once
>>> pipe.write("Hello, World!")
13
>>> pipe.close()    # Receiver read() is empty until this happens

C:

#include <fcntl.h>
int main()
{
    int fd = open("/tmp/testpipe", O_WRONLY);   // Open the pipe
    write(fd, "Hi", sizeof("Hi"));              // Write "Hi" to pipe
    close(fd);                                  // Close the pipe - allow the read
    return 0;
}





Unix Domain Sockets


Unix Domain Sockets is a fancy name for a fancy pipe. Like shell pipes, Unix Sockets use a pipe file...Well it *looks* like a pipe file. Actually, data is buffered instead of written to a file. The file is simply an easy way for the processes to find each other.

Unlike shell pipes, Unix sockets are bi-directional, and multiple writers are not anonymous. In Python, sockets are single-use: After a close() they cannot be reopened, and must be deleted and remade. Pipes can only be read() when the sender close(), but sockets can be read() for every send().

Since sockets are bi-directional, it's not a sender/receiver relationship anymore. It's a client/server relationship.

Processes using a Unix Domain Socket usually work in the following order:
  • Create a file descriptor (fd) that defines the socket OR look up the fd of an existing socket.
  • Server binds to the socket
  • Server listens on the socket
  • Client connects to the socket
  • Server accepts the connection
  • Both sides read/write
  • Client closes socket (on Python, cannot be reopened - must be reconnected )
  • Server closes socket (usually during service shutdown)
  • Delete/unlink the file descriptor and socket (if it won't be reused)

Creating and Deleting a Unix Domain Socket:


Shell:

Shell creation can be automatic, for example using netcat:
$ nc -lU /tmp/sock           # Create socket, among other actions
$ rm /tmp/sock               # Delete socket

Python3:

>>> import socket, os
>>> tempsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)  
>>> tempsocket.bind("/tmp/sock")  # Create socket
>>> os.remove("/tmp/sock")        # Delete socket 

C:

#include <sys/socket.h>
#include <sys/un.h>
int main() {
  char *path = "/tmp/sock";
  struct sockaddr_un addr;
  char buf[100];
  int cl,rc;
  int fd = socket(AF_UNIX, SOCK_STREAM, 0);   // Create a file descriptor
  addr.sun_family = AF_UNIX;                  // It's a Unix socket
  strcpy(addr.sun_path, path);                // Set the path
  memset(&addr, 0, sizeof(addr));         // Fill the address array with zeroes (prevent garbage)
  bind(fd, (struct sockaddr*)&addr, sizeof(addr));    // Create socket (server)
  connect(fd, (struct sockaddr*)&addr, sizeof(addr)); // Connect (client)
  unlink(path);                               // Delete socket
  return 0;
}


Unix Domain Socket Servers (Bind to socket):

The server owns (binds) and monitors the socket, and is usually responsible for creating and deleting the socket. (C example)

Shell:

$ nc -lU /tmp/sock

Python3:

>>> import socket
>>> tempsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
>>> tempsocket.bind("/tmp/sock")       # Listener is first process using socket
>>> tempsocket.listen(1)
>>> conn, addr = tempsocket.accept()   # Listener blocks here awaiting sender's connect()
>>> conn.recv(1024).decode('UTF-8')    # Listener blocks here awaiting send()
'Hello, World'                         # Try this a few times to capture each send()
>>> tempsocket.close()

C:

#include <stdio.h>
#include <sys/socket.h>
#include <sys/un.h>
int main() {
  char buf[100];
  int cl,rc;
  int fd = create_socket(socket_path)      // Creating the socket is not done here. See above
  listen(fd, 5);                // listen() is the step after bind()
  while (1) {
    cl = accept(fd, NULL, NULL);
    while ( (rc=read(cl,buf,sizeof(buf))) > 0) {
      printf("read %u bytes: %.*s\n", rc, rc, buf);
    }
    if (rc == 0) {
      printf("EOF\n");
      close(cl);
    }
  }
  return 0;
}



Unix Domain Socket Clients (Connect to socket):

Clients don't own (bind) the socket. Instead, they connect on-demand.

Shell:

$ nc -U /tmp/sock

Python3:

>>> import socket
>>> tempsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
>>> tempsocket.connect("/tmp/sock")           # Listener must be active before sender
>>> tempsocket.send(data)                     # Try this a few times
13
>>> tempsocket.close()                        # End of stream

C:

#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
int main() {
  struct sockaddr_un addr;
  char buf[100];
  int rc;
  int fd = create_socket(socket path)  // Creating the socket is not done here

  // First step after connect()  
  while( (rc=read(STDIN_FILENO, buf, sizeof(buf))) > 0) write(fd, buf, rc);
  return 0;
}



Quick fun with netcat:

The easiest possible bi-directional socket.
$ nc -lU /tmp/sock   # Enter in Terminal #1
$ nc -U /tmp/sock    # Enter in Terminal #2

Type anything on either terminal.
Look! The two communicate!

Here is an example of using netcat to exchange files this way (from the man page)
$ nc -lU /tmp/sock > filename.out      # Terminal #1 (sender)
$ nc -U /tmp/sock < filename.in        # Terminal #2 (receiver)





TCP Sockets (Ports)


TCP ports are very much like Unix sockets. Instead of a file, the data stream is buffered. And Unix sockets are not networkable, but TCP ports are a foundation of the Internet. Ports use client/server, and the server owns (binds) the port that the client connects to. Sockets and Ports use the same shell application (netcat) and Python3 module (socket).

Since there is no file to create or delete, bind() and connect() simply use an IP address and port number.

Processes using a TCP port usually work in the following order (just like a Unix Domain Socket):
  • Create a file descriptor (fd) that defines the port
  • Server binds to the port
  • Server listens on the port
  • Client connects to the port
  • Server accepts the connection
  • Both sides read/write
  • Client closes port
  • Server closes port (usually during service shutdown)
  • Delete/unlink the file descriptor and port (if it won't be reused)


TCP Port Services (Bind to port):

The server owns (binds) and monitors the port.

Shell:

$ nc -l 45678      # Port 45678

Python3:

>>> import socket
>>> tempsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> tempsocket.bind(("127.0.0.1", 45678))   # Tuple: (IP Addr, Port)
>>> tempsocket.listen(1)
>>> conn, addr = tempsocket.accept()   # Listener blocks here awaiting sender's connect()
>>> conn.recv(1024).decode('UTF-8')    # Listener blocks here awaiting send()
'Hello, World'                                  # Try it a few times to catch each send()
>>> tempsocket.close()

C:

#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
int main()
{
     int sock_fd, conn_fd;   // Socket file descriptor and Connection file descriptor
     int port_num = 45678;
     socklen_t clilen;
     char buffer[256];
     struct sockaddr_in serv_addr, cli_addr;
     sockfd = socket(AF_INET, SOCK_STREAM, 0);        // Define socket type
     memset(&serv_addr, 0, sizeof(serv_addr));    // Set array to zeroes 
     serv_addr.sin_family = AF_INET;
     serv_addr.sin_addr.s_addr = INADDR_ANY;
     serv_addr.sin_port = htons(port_num);
     bind(sock_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));   // Bind
     listen(sock_fd,5);                               // Listen
     clilen = sizeof(cli_addr);
     conn_fd = accept(sock_fd, 
                 (struct sockaddr *) &cli_addr, 
                 &clilen);                        // Accept
     memset(&buffer, 0, sizeof(buffer));          // Fill buffer with zeroes - prevent garbage
     read(conn_fd,buffer,255);                        // Read
     printf("Here is the message: %s\n",buffer);
     write(conn_fd,"I got your message",18);          // Write
     close(conn_fd);
     close(sock_fd);                                  // Close
     return 0; 
}


TCP Port Clients (Connect to port):

The client connects to the port on-demand.

Shell:

$ nc localhost 45678      # IP=loopback, Port #45678

Python3:

>>> import socket
>>> tempsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> tempsocket.connect(("127.0.0.1", 45678))   # Tuple: (IP Addr, Port)
>>> data = "Hello, World".encode("utf-8")
>>> tempsocket.send(data)          # Try this a few times
12
>>> tempsocket.close()

C:

#include <stdio.h>
#include <string.h>
#include <netdb.h>
int main(int argc, char *argv[])
{
    int sock_fd;
    int port_num = 45678;
    struct sockaddr_in serv_addr;
    struct hostent *server;
    char buffer[256];
    sock_fd = socket(AF_INET, SOCK_STREAM, 0);       // Define socket type
    server = gethostbyname(port_num);                // Lookup port owner
    memset(&serv_addr, 0, sizeof(serv_addr));    // Set array to zeroes
    serv_addr.sin_family = AF_INET;
    bcopy((char *)server->h_addr,
         (char *)&serv_addr.sin_addr.s_addr,
         server->h_length);
    serv_addr.sin_port = htons(port_num);
    connect(sock_fd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)); // Connect
    printf("Please enter the message: ");
    memset(&buffer, 0, sizeof(buffer));
    fgets(buffer,255,stdin);                // Keyboard input
    write(sock_fd,buffer,strlen(buffer));   // Write to server
    memset(&buffer, 0, sizeof(buffer)); // Reuse buffer array - clear to prevent garbage
    read(sock_fd,buffer,255);               // Read response
    printf("%s\n",buffer);                  // Print response
    close(sock_fd);                         // Close
    return 0;
}


More fun with netcat

This is the basic way that webservers work.

$ echo "Hello, World" | nc -l 45678   # Enter in Terminal #1
$ nc localhost 45678                  # Enter in Terminal #2
                                      # OR open http://localhost:45678
                                      # in your favorite web browser