stomp protokoll, http://activemq.apache.org/stomp.html

Begonnen von amos, 27. November 2013, 16:59:45

Vorheriges Thema - Nächstes Thema

0 Mitglieder und 1 Gast betrachten dieses Thema.

amos

Hallo, wie müsste ich grundsätzlichvorgehen um ein ein weiteres Protokoll (stomp, port 61613) zu implementieren.
Das Ziel wäre, dass unterschiedliche Wago Feldbuskontroller ihre Messwerte zur asyncronen Verarbeitung zentral in der activemq queue ablegen können und jeweils die Steuerung die gerade nichts mehr zu arbeiten hat, von dort den nächsten Messauftrag in der Warteschlange entgegen nehmen kann. Oder gibt es eventuell noch ander, bessere Konzepte zur Lastverteilung?

Viele Grüße
amos

Hier ein Client Beispiel in C:
#include <stdlib.h>
#include "stomp.h"
int die(int exitCode, const char *message, apr_status_t reason) {
   char msgbuf[80];
        apr_strerror(reason, msgbuf, sizeof(msgbuf));
        fprintf(stderr, "%s: %s (%d)\n", message, msgbuf, reason);
        exit(exitCode);
        return reason;
}
static void terminate()
{
  apr_terminate();
}
int main(int argc, char *argv[])
{
  apr_status_t rc;
  apr_pool_t *pool;
  stomp_connection *connection;
  setbuf(stdout, NULL);
  rc = apr_initialize();
        rc==APR_SUCCESS || die(-2, "Could not initialize", rc);
  atexit(terminate);   
  rc = apr_pool_create(&pool, NULL);
        rc==APR_SUCCESS || die(-2, "Could not allocate pool", rc);
  fprintf(stdout, "Connecting......");
  rc=stomp_connect( &connection, "activemq_server", 61613, pool);
        rc==APR_SUCCESS || die(-2, "Could not connect", rc);
  fprintf(stdout, "OK\n");
  fprintf(stdout, "Sending connect message.");
  {
     stomp_frame frame;
     frame.command = "CONNECT";
     frame.headers = apr_hash_make(pool);
     apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, "hchirino");         
     apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, "letmein");         
     frame.body = NULL;
     rc = stomp_write(connection, &frame);
     rc==APR_SUCCESS || die(-2, "Could not send frame", rc);
  } 
  fprintf(stdout, "OK\n");   
  fprintf(stdout, "Reading Response.");
  {
     stomp_frame *frame;
     rc = stomp_read(connection, &frame, pool);
     rc==APR_SUCCESS || die(-2, "Could not read frame", rc);
     fprintf(stdout, "Response: %s, %s\n", frame->command, frame->body);
  }     
  fprintf(stdout, "OK\n");
  fprintf(stdout, "Sending Subscribe.");
  {
     stomp_frame frame;
     frame.command = "SUB";
     frame.headers = apr_hash_make(pool);
     apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, "/queue/FOO.BAR");     
     frame.body = NULL;
     rc = stomp_write(connection, &frame);
     rc==APR_SUCCESS || die(-2, "Could not send frame", rc);
  } 
  fprintf(stdout, "OK\n");
  fprintf(stdout, "Sending Message.");
  {
     stomp_frame frame;
     frame.command = "SEND";
     frame.headers = apr_hash_make(pool);
     apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, "/queue/FOO.BAR");
     frame.body = "This is the message";
     rc = stomp_write(connection, &frame);
     rc==APR_SUCCESS || die(-2, "Could not send frame", rc);
  } 
  fprintf(stdout, "OK\n");
  fprintf(stdout, "Reading Response.");
  {
     stomp_frame *frame;
     rc = stomp_read(connection, &frame, pool);
     rc==APR_SUCCESS || die(-2, "Could not read frame", rc);
     fprintf(stdout, "Response: %s, %s\n", frame->command, frame->body);
  }     
  fprintf(stdout, "OK\n");
  fprintf(stdout, "Sending Disconnect.");
  {
     stomp_frame frame;
     frame.command = "DISCONNECT";
     frame.headers = NULL;
     frame.body = NULL;
     rc = stomp_write(connection, &frame);
     rc==APR_SUCCESS || die(-2, "Could not send frame", rc);
  } 
  fprintf(stdout, "OK\n");
  fprintf(stdout, "Disconnecting...");
        rc=stomp_disconnect(&connection);
        rc==APR_SUCCESS || die(-2, "Could not disconnect", rc);
  fprintf(stdout, "OK\n");
  apr_pool_destroy(pool);         
  return 0;
}