Open Source .NET (C#) Twitter Streaming API Client

April 26, 201060 Comments

UPDATE: Twitter removed basic authentication and converted to oAuth. This code will no longer work as written. 04/05/2013

It can take some time to write a client to consume data from the Twitter Streaming API.  Although this has been done a few times already, I wanted to build an application using only .NET code.  This means that the connection to Twitter is made using the HttpWebRequest object and the resulting JSON is parsed with a Data Contract.  There were quite a number of pitfalls along the way, so I hope this code will help you avoid some of the issues that I uncovered.  For example, HttpWebRequest hung if I didn’t abort the request prior to a close, and parsing JSON using .NET objects can be tricky unless you’ve done it before.

The code can be found on GitHub:  https://github.com/swhitley/TwitterStreamClient

Run the application with the “/encrypt {password}” parameters to generate the encrypted password for the app.config file.

    <appSettings>
        <add key="loglevel" value="ALL"/>
        <add key="use_queue" value="true"/>
        <add key="multithread" value="false"/>
        <add key="twitter_username" value="{Twitter Username"/>
        <add key="twitter_password_encrypted" value="{Encrypted Twitter Password"/>
        <add key="stream_url" value="http://stream.twitter.com/1/statuses/sample.json"/>
    </appSettings>

This is a console application that accepts command line arguments.  Use “/?” to see the options.  You can execute “twitterstreamclient” without any arguments and the application will connect to Twitter using the supplied username and password.  If you have “use_queue” set to false, the output will only go to the screen and you’ll see the results of the stream fly by.

If you set “use_queue” to “true,” the messages will be written to MSMQ.  You can then run a second instance of the application with the “/p” command line argument.  That argument instructs the application to process the message queue.  Write additional code to store the messages in a database.  The current code will parse the JSON and write the string to the screen.

I’ve included the text of the main class below.  As you can see, I’ve tried to implement the recommended backoff procedures from Twitter’s documentation.  I’m looking forward to your feedback on this and want to see recommendations for improvement.  I know that these features are already built into some of the libraries out there, but if you’re like me, you don’t always want to be tied to a library and you’d rather have something that’s easy to tweak for your own needs.

using System;
using System.Text;
using System.IO;
using System.Net;
using System.Configuration;
using System.Threading;
using System.Diagnostics;
using System.Web;
using System.Reflection;
using System.Runtime.Serialization.Json;
using System.Messaging;
using System.Security.Cryptography;

namespace TwitterStreamClient
{
    public class TwitterStream
    {
        public void Stream2Queue()
        {
            string username = ConfigurationManager.AppSettings["twitter_username"];
            string password = Common.Decrypt( ConfigurationManager.AppSettings["twitter_password_encrypted"]);
            //Twitter Streaming API
            string stream_url = ConfigurationManager.AppSettings["stream_url"];

            HttpWebRequest webRequest = null;
            HttpWebResponse webResponse = null;
            StreamReader responseStream = null;
            MessageQueue q = null;
            string useQueue = ConfigurationManager.AppSettings["use_queue"];

            int wait = 250;
            string jsonText = "";

            Logger logger = new Logger();

            try
            {
                //Message Queue
                if (useQueue == "true")
                {
                    if (MessageQueue.Exists(@".\private$\Twitter"))
                    {
                        q = new MessageQueue(@".\private$\Twitter");
                    }
                    else
                    {
                        q = MessageQueue.Create(@".\private$\Twitter");
                    }
                }

                while (true)
                {

                    try
                    {
                        //Connect
                        webRequest = (HttpWebRequest)WebRequest.Create(stream_url);
                        webRequest.Credentials = new NetworkCredential(username, password);
                        webRequest.Timeout = -1;
                        webResponse = (HttpWebResponse)webRequest.GetResponse();
                        Encoding encode = System.Text.Encoding.GetEncoding("utf-8");
                        responseStream = new StreamReader(webResponse.GetResponseStream(), encode);

                        //Read the stream.
                        while (true)
                        {
                            jsonText = responseStream.ReadLine();
                            //Post each message to the queue.
                            if (useQueue == "true")
                            {
                                Message message = new Message(jsonText);
                                q.Send(message);
                            }

                            //Success
                            wait = 250;

                            //Write Status
                            Console.Write(jsonText);

                        }
                        //Abort is needed or responseStream.Close() will hang.
                        webRequest.Abort();
                        responseStream.Close();
                        responseStream = null;
                        webResponse.Close();
                        webResponse = null;

                    }
                    catch (WebException ex)
                    {
                        Console.WriteLine(ex.Message);
                        logger.append(ex.Message, Logger.LogLevel.ERROR);
                        if (ex.Status == WebExceptionStatus.ProtocolError)
                        {
                            //-- From Twitter Docs --
                            //When a HTTP error (> 200) is returned, back off exponentially.
                            //Perhaps start with a 10 second wait, double on each subsequent failure,
                            //and finally cap the wait at 240 seconds.
                            //Exponential Backoff
                            if (wait < 10000)
                            {
                                wait = 10000;
                            }
                            else
                            {
                                if (wait < 240000)
                                {
                                    wait = wait * 2;
                                }
                            }
                        }
                        else
                        {
                            //-- From Twitter Docs --
                            //When a network error (TCP/IP level) is encountered, back off linearly.
                            //Perhaps start at 250 milliseconds and cap at 16 seconds.
                            //Linear Backoff
                            if (wait < 16000)
                            {
                                wait += 250;
                            }

                        }
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        logger.append(ex.Message, Logger.LogLevel.ERROR);
                    }
                    finally
                    {
                        if (webRequest != null)
                        {
                            webRequest.Abort();
                        }
                        if (responseStream != null)
                        {
                            responseStream.Close();
                            responseStream = null;
                        }

                        if (webResponse != null)
                        {
                            webResponse.Close();
                            webResponse = null;
                        }
                        Console.WriteLine("Waiting: " + wait);
                        Thread.Sleep(wait);
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                logger.append(ex.Message, Logger.LogLevel.ERROR);
                Console.WriteLine("Waiting: " + wait);
                Thread.Sleep(wait);
            }
        }

        public void QueueRead()
        {
            MessageQueue q;
            string multiThread = ConfigurationManager.AppSettings["multithread"];
            Logger logger = new Logger();

            try
            {
                if (MessageQueue.Exists(@".\private$\Twitter"))
                {
                    q = new MessageQueue(@".\private$\Twitter");
                }
                else
                {
                    Console.WriteLine("Queue does not exist.");
                    return;
                }

                while (true)
                {
                    Message message;
                    try
                    {
                        message = q.Receive();
                        message.Formatter =
                            new XmlMessageFormatter(new String[] { "System.String" });
                        if (multiThread == "true")
                        {
                            ThreadPool.QueueUserWorkItem(MessageProcess, message);
                        }
                        else
                        {
                            MessageProcess(message);
                        }
                    }
                    catch { continue; }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                logger.append(ex.Message, Logger.LogLevel.ERROR);
            }
        }

        public void MessageProcess(object objMessage)
        {
            status status = new status();
            Logger logger = new Logger();
            DataContractJsonSerializer json = new DataContractJsonSerializer(status.GetType());

            try
            {
                Message message = objMessage as Message;

                byte[] byteArray = Encoding.UTF8.GetBytes(message.Body.ToString());
                MemoryStream stream = new MemoryStream(byteArray);

                //TODO:  Check for multiple objects.
                status = json.ReadObject(stream) as status;

                Console.WriteLine(message.Body.ToString());

                //TODO: Store the status object
                DataStore.Add(status);

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                logger.append(ex.Message, Logger.LogLevel.ERROR);
            }
        }
    }
}

Technorati Tags: ,,,,
Share

58 Responses to “Open Source .NET (C#) Twitter Streaming API Client”

  1. Please let me know if you’re looking for a author for your
    blog. You have some really great articles and I
    believe I would be a good asset. If you ever want to take some of the load off,
    I’d love to write some material for your blog in exchange for a
    link back to mine. Please send me an email if interested.
    Thank you!

  2. Buster says:

    When someone writes an post he/she keeps the plan of a user in his/her mind that how a user can understand it.
    Thus that’s why this aticle is great. Thanks!

    Feel free to surf to my homepage … family room
    decorating (Buster)

  3. Just desire to say your article is as surprising.

    The clarity in your put up is just great and that i can assume you are an expert
    on this subject. Fine together with your permission let me to clutch
    your RSS feed to stay up to date with impending post. Thanks 1,000,000 and please continue the enjoyable work.

  4. Otras herramientas pueden suministrar datos transaccionales,
    mas los gestores deben acudir a otros sistemas y medios (e mail teléfono) para actuar.

Leave a Reply

Twitter Tweet This

2 Trackbacks

  1. eclipsed4utoo (Ryan Alford)

    Twitter Comment


    RT @swhitley: “Open Source .NET (C#) Twitter Streaming API Client” [new post] – [link to post]

    Posted using Chat Catcher

  2. swhitley (Shannon Whitley)

    Twitter Comment


    “Open Source .NET (C#) Twitter Streaming API Client” [new post] – [link to post]

    Posted using Chat Catcher