[This article was first published on Adventures in Statistical Computing, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Interactive Brokers is a discount brokerage that provides a good API for programatically accessing their platform. The purpose of this post is to create an application that will capture tick level data and save that data into a database for future use.Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
I started to use the IBrokers package in R to do this post. However, as R is NOT easily threaded and the IB API is heavily threaded, well… oil and water.
Instead I went with the C# port of the API from DinosaurTech. It’s good and it’s free.
Luckily you do not need an account with Interactive Brokers for this project. They have a demo environment available on their webpage. The data is FAKE, but it’s good enough to test connectivity. Simply run the demo, then go to Configure->API->Settings and insure that the “Enable ActiveX and Socket Clients” is checked and the port is set to 7496.
To follow along in this post, you will need
- MySQL, MySQL Workbench, and MySQL Connector for .NET (all available here).
- VS2010 with C#. You can download the free Express version here.
- The libraries from DinosaurTech available at the link above.
- A basic understanding of C# (available here [Pro C# 2010 and the .NET 4 Platform]
, here [Beginning Visual C# 2010 (Wrox Programmer to Programmer)] , or here[Google] ).
To start, install all of the above items. In MySQL workbench, run the following SQL to create the table to store ticks.
You will note that we are partitioning the table by date. The goal of this is so that as the data grows, day over day, that we can still query quickly.SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0;< o:p>SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0;< o:p>SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE=’TRADITIONAL’;< o:p>DROP SCHEMA IF EXISTS `tick_db` ;< o:p>CREATE SCHEMA IF NOT EXISTS `tick_db` DEFAULT CHARACTER SET latin1 COLLATE latin1_swedish_ci ;< o:p>USE `tick_db` ;< o:p>— —————————————————–< o:p>— Table `ticks`< o:p>— —————————————————–< o:p>DROP TABLE IF EXISTS `ticks` ;< o:p>CREATE TABLE IF NOT EXISTS `ticks` (< o:p>`idticks` INT NOT NULL ,< o:p>`symbol` VARCHAR(8) NOT NULL ,< o:p>`date` DATE NOT NULL ,< o:p>`time` TIME NOT NULL ,< o:p>`value` FLOAT NOT NULL ,< o:p>`type` VARCHAR(12) NOT NULL ,< o:p>PRIMARY KEY (`idticks`, `date`) )< o:p>ENGINE = InnoDB PARTITION BY KEY(date) PARTITIONS 1;< o:p>CREATE INDEX `Symbol` ON `ticks` (`symbol` ASC) ;< o:p>SET SQL_MODE=@OLD_SQL_MODE;< o:p>SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;< o:p>SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;
If you do not not have a user in the database, create one and give it permissions on this table (my user is dom and the password, a super secure dom123).
Before we go on, let’s discuss how the IB API works. Your programming environment connects through their trading platform called TWS (Trader Work Station) as it is running on your computer. It uses the TWS connections to the IB servers to make your requests. When you request market tick data for a stock, the data arrives whenever it wants. There is no synchronous get() function. You ask for data and the API fires events when that data comes back. In this, you can have multiple requests running at the same time. You just have to keep straight what equity data you are processing at any given time.
This threaded nature means that we can process a large number of requests. It also means that we need to free up the threads that are receiving data as quickly as possible. I.E. they shouldn’t be writing to the database. We will create a queue of ticks in a separate thread that will post those to the database as quickly as it can.
Ticks are reported as updates to BID, BID_SIZE, ASK, ASK_SIZE, LAST, LAST_SIZE, VOLUME, HIGH, LOW, and CLOSE.
Here is the plan of action:
- Create a new console application in C#.
- Create a Tick class for holding the tick and give it a method that returns the SQL INSERT statement to put that tick into our database.
- Create a TickQueue class that will process ticks that are inserted into its Queue object as fast as possible.
- Create a TickMain class that will request all the market data and insert the tick events into the TickQueue.
- Call the TickMain class from a separate thread in the console application and watch the magic happen.
First, our Tick class
This class is basic. We simply hold the information about the tick, and create a method that will create our SQL INSERT statement.public class Tick< o:p>{< o:p>public String type = “”;< o:p>public decimal value = 0;< o:p>public String date = System.DateTime.Now.ToString(“yyyy-MM-dd”);< o:p>public String time = System.DateTime.Now.ToString(“H:mm:ss.ss”);< o:p>public Stringsymbol = “”;< o:p>public Int32 index = 0;< o:p>public Tick(Stringtype, decimal value, Stringsymbol, Int32 index)< o:p>{< o:p>this.type = type;< o:p>this.value = value;< o:p>this.symbol = symbol;< o:p>this.index = index;< o:p>}< o:p>public Tick() { }< o:p>public StringtoInsert()< o:p>{< o:p>String output = “insert into ticks (idticks,symbol,date,time,value,type) values (“ +< o:p>index + < o:p>“,'” + symbol + < o:p>“‘, DATE(‘” + date +< o:p>“‘),TIME(‘” + time + “‘),” +< o:p>value + < o:p>“,'” + type + “‘)”;< o:p>return output;< o:p>}< o:p>}
Next is the TickQueue. This is more involved…
You will notice that I am not using threads in this object. The TickMain object will have the worker thread that call the Run() method in TickQueue. Because multiple threads will be accessing the object, I’ve surrounded all access points to the actual queue with a lock() statement. That will insure that only 1 thread at a time gets access.public class TickQueue< o:p>{< o:p>// Internal queue of db inserts< o:p>private Queue<Tick> queue = newQueue<Tick>();< o:p>< o:p>// Locking Object. Necessary as multiple threads will be < o:p>// accessing this object simultanously< o:p>private ObjectqLockObj = new object();< o:p>//Connection object to MySQL< o:p>public MySqlConnectionconn = null;< o:p>//Flag to stop processing< o:p>public bool stop = false;< o:p>//Method to enqueue a write< o:p>public void Add(Tick item)< o:p>{< o:p>//Lock for single access only< o:p>lock(qLockObj)< o:p>{< o:p>queue.Enqueue(item);< o:p>}< o:p>}< o:p>//Method to write items as they are enqueued< o:p>public void Run()< o:p>{< o:p>int n = 0;< o:p>//Loop while the stop flag is false< o:p>while (!stop)< o:p>{< o:p>//Lock and get a count of the object in the queue< o:p>lock (qLockObj)< o:p>{< o:p>n = queue.Count;< o:p>}< o:p>//If there are objects in the queue, then process them< o:p>if (n > 0)< o:p>{< o:p>process();< o:p>}< o:p>//Sleep for .1 seconds before looping again< o:p>System.Threading.Thread.Sleep(100);< o:p>}< o:p>//When the shutdown flag is received, write any < o:p>//values still in the queue and then stop< o:p>Console.WriteLine(“Shutting Down TickQueue; “ + queue.Count + ” items left”);< o:p>process();< o:p>}< o:p>//Method to process items in the queue< o:p>private voidprocess()< o:p>{< o:p>List<Tick> inserts = new List<Tick>();< o:p>int i = 0;< o:p>//Loop through the items in the queue and put them in a list< o:p>lock (qLockObj)< o:p>{< o:p>for (i = 0; i < queue.Count; i++)< o:p>inserts.Add(queue.Dequeue());< o:p>}< o:p>Console.WriteLine(“Processing “ + i + ” items into database”);< o:p>//call insert for each item.< o:p>foreach (Tick t in inserts)< o:p>insert(t);< o:p>}< o:p>//Method to insert a tick< o:p>private void insert(Tick t)< o:p>{< o:p>using (MySqlCommandcmd = conn.CreateCommand())< o:p>{< o:p>cmd.CommandText = t.toInsert();< o:p>try< o:p>{< o:p>cmd.ExecuteNonQuery();< o:p>}< o:p>catch (Exceptionexp)< o:p>{< o:p>Console.WriteLine(“OOPS “ + exp.Message);< o:p>}< o:p>}< o:p>}< o:p>}
I’ve tried to comment enough to give an idea of what is happening in there. If not, let me know and I will expand. The same goes for TickMain below.
Now the TickMain class:
Stock requests are given a unique ID. The tick events have this ID, not the symbol. So we keep track of the ID and symbol pairs in a Dictionary. You will note that we have a BackgroundWorker in here that calls and run the database write queue. The tick event handlers process each tick, assign it a unique index ID (another source of possible thread contention, hence the lock() around the index creation).public class TickMain< o:p>{< o:p>//Private and public accessor for the IBClient object< o:p>private IBClient_client = null;< o:p>public IBClientclient< o:p>{< o:p>get { return _client; }< o:p>set< o:p>{< o:p>_client = value;< o:p>//If the client is connected, then set the queue.stop = false< o:p>if (_client.Connected)< o:p>queue.stop = false;< o:p>else< o:p>queue.stop = true;< o:p>}< o:p>}< o:p>public List<String> stockList = newList<string>();< o:p>public MySqlConnectionconn = null;< o:p>public bool doGet = true;< o:p>private TickQueuequeue = new TickQueue();< o:p>private BackgroundWorkerbg = new BackgroundWorker();< o:p>private Dictionary<int, String> tickId = new Dictionary<int, string>();< o:p>private int tickIndex = 0;< o:p>private objectlockObj = new object();< o:p>//Constructors< o:p>public TickMain()< o:p>{< o:p>initialize();< o:p>}< o:p>public TickMain(IBClientclient, List<String> stockList, MySqlConnection conn)< o:p>{< o:p>this.client = client;< o:p>this.stockList = stockList;< o:p>this.conn = conn;< o:p>initialize();< o:p>}< o:p>//Initialization method< o:p>private voidinitialize()< o:p>{< o:p>//Setup the background worker to run the queue< o:p>bg.DoWork += new DoWorkEventHandler(bg_DoWork);< o:p>//Connect to MySQL if we haven’t already< o:p>if (conn.State != System.Data.ConnectionState.Open)< o:p>conn.Open();< o:p>//Don’t process the queue if not connected to IB< o:p>if (!client.Connected)< o:p>queue.stop = true;< o:p>//Set the MySQL connection for hte queue< o:p>queue.conn = conn;< o:p>//Get the next value of the queue index< o:p>using (MySqlCommandcmd = conn.CreateCommand())< o:p>{< o:p>cmd.CommandText = “select coalesce(max(idticks),0) from ticks”;< o:p>MySqlDataReader Reader;< o:p>Reader = cmd.ExecuteReader();< o:p>Reader.Read();< o:p>tickIndex = Reader.GetInt32(0) + 1;< o:p>Reader.Close();< o:p>}< o:p>}< o:p>//Method for getting market prices< o:p>public void Run()< o:p>{< o:p>if (client.Connected)< o:p>{< o:p>//Set up the event handlers for the ticks< o:p>client.TickPrice += new EventHandler<TickPriceEventArgs>(client_TickPrice);< o:p>client.TickSize += newEventHandler<TickSizeEventArgs>(client_TickSize);< o:p>< o:p>//Initialize a counter for stock symbols< o:p>int i = 1;< o:p>< o:p>//Start the queue< o:p>bg.RunWorkerAsync();< o:p>//Request market data for each stock in the stockList< o:p>foreach (Stringstr in stockList)< o:p>{< o:p>tickId.Add(i, str);< o:p>client.RequestMarketData(i, new Equity(str), null, false, false);< o:p>i++;< o:p>}< o:p>//Hang out until told otherwise< o:p>while (doGet)< o:p>{< o:p>System.Threading.Thread.Sleep(100);< o:p>}< o:p>//Remove event handlers< o:p>Console.WriteLine(“Shutting Down TickMain”);< o:p>client.TickPrice -= new EventHandler<TickPriceEventArgs>(client_TickPrice);< o:p>client.TickSize -= new EventHandler<TickSizeEventArgs>(client_TickSize);< o:p>queue.stop = true;< o:p>}< o:p>}< o:p>//Event handler for TickSize events< o:p>void client_TickSize(objectsender, TickSizeEventArgs e)< o:p>{< o:p>//Get the symbol from the dictionary< o:p>String symbol = tickId[e.TickerId];< o:p>int i = 0;< o:p>< o:p>//As this is asynchronous, lock and get the current tick index< o:p>lock (lockObj)< o:p>{< o:p>i = tickIndex;< o:p>tickIndex++;< o:p>}< o:p>//Create a tick object and enqueue it< o:p>Tick tick = new Tick(EnumDescConverter.GetEnumDescription(e.TickType), < o:p>e.Size, symbol, i);< o:p>queue.Add(tick);< o:p>}< o:p>//Event Handler for TickPrice events< o:p>void client_TickPrice(objectsender, TickPriceEventArgs e)< o:p>{< o:p>//Get the symbol from the dictionary< o:p>Stringsymbol = tickId[e.TickerId];< o:p>int i = 0;< o:p>//As this is asynchronous, lock and get the current tick index< o:p>lock (lockObj)< o:p>{< o:p>i = tickIndex;< o:p>tickIndex++;< o:p>}< o:p>//Create a tick object and enqueue it< o:p>Tick tick = new Tick(EnumDescConverter.GetEnumDescription(e.TickType), < o:p>e.Price, symbol, i);< o:p>queue.Add(tick);< o:p>}< o:p>//BackgroundWorker delegate to run the queue.< o:p>private voidbg_DoWork(object sender, DoWorkEventArgs e)< o:p>{< o:p>queue.Run();< o:p>}< o:p>}
Finally the program Class in the console application
The application will run, scrolling updates on how many records are being written to the database until you hit the enter key. After that, the system shuts down and you are prompted to hit enter once more to exit the application.class Program< o:p>{< o:p>static MySqlConnectionconn = < o:p>new MySqlConnection(“server=LOCALHOST;DATABASE=tick_db;USER=dom;PASSWORD=dom123”);< o:p>static TickMainmain = null;< o:p>static void Main(string[] args)< o:p>{< o:p>//Open the connections.< o:p>conn.Open();< o:p>IBClient client = newIBClient();< o:p>client.Connect(“localhost”, 7496, 2);< o:p>//List of stock ticks to get< o:p>List<String> stockList = new List<string>();< o:p>stockList.Add(“GOOG”);< o:p>stockList.Add(“SPY”);< o:p>stockList.Add(“SH”);< o:p>stockList.Add(“DIA”);< o:p>//Initialize the TickMain object< o:p>main = new TickMain(client, stockList, conn);< o:p>main.doGet = true;< o:p>//Setup a worker to call main.Run() asycronously.< o:p>BackgroundWorker bg = newBackgroundWorker();< o:p>bg.DoWork += new DoWorkEventHandler(bg_DoWork);< o:p>bg.RunWorkerAsync();< o:p>< o:p>//Chill until the user hits enter then stop the TickMain object< o:p>Console.ReadLine();< o:p>main.doGet = false;< o:p>//disconnect< o:p>client.Disconnect();< o:p>< o:p>Console.WriteLine(“Hit Enter to Continue”);< o:p>Console.ReadLine();< o:p>}< o:p>//Delegate for main.Run()< o:p>static void bg_DoWork(objectsender, DoWorkEventArgs e)< o:p>{< o:p>main.Run();< o:p>}< o:p>< o:p>}
The entire VS2010 project and SQL for the table can be found here.
My plan is to set this up to run all next week with live data, pull it into R next weekend, and see what we can see.
To leave a comment for the author, please follow the link and comment on their blog: Adventures in Statistical Computing.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.