[Rprotobuf-commits] r303 - java java/src/org/rproject/rprotobuf old pkg pkg/R pkg/src
noreply at r-forge.r-project.org
noreply at r-forge.r-project.org
Thu Apr 8 21:21:05 CEST 2010
Author: romain
Date: 2010-04-08 21:21:05 +0200 (Thu, 08 Apr 2010)
New Revision: 303
Added:
old/rpc_over_http.cpp
Removed:
pkg/src/rpc_over_http.cpp
Modified:
java/build.xml
java/src/org/rproject/rprotobuf/ProtobufHttpServer.java
java/start.sh
pkg/DESCRIPTION
pkg/R/rpc.R
pkg/src/rprotobuf.h
Log:
using RCurl for rpc over http
Modified: java/build.xml
===================================================================
--- java/build.xml 2010-04-08 07:49:53 UTC (rev 302)
+++ java/build.xml 2010-04-08 19:21:05 UTC (rev 303)
@@ -7,7 +7,7 @@
<property name="jar.dir" value="jar" />
<property name="lib.dir" value="lib" />
- <property name="protobuf.jar" value="${lib.dir}/protobuf-java-2.2.0.jar" />
+ <property name="protobuf.jar" value="${lib.dir}/protobuf-java-2.3.0.jar" />
<property name="generated.dir" value="generated" />
<path id="classpath">
Modified: java/src/org/rproject/rprotobuf/ProtobufHttpServer.java
===================================================================
--- java/src/org/rproject/rprotobuf/ProtobufHttpServer.java 2010-04-08 07:49:53 UTC (rev 302)
+++ java/src/org/rproject/rprotobuf/ProtobufHttpServer.java 2010-04-08 19:21:05 UTC (rev 303)
@@ -35,13 +35,13 @@
int port = DEFAULT_PORT ;
String root = DEFAULT_ROOT ;
- if( args != null && args.length >= 0 ){
+ if( args != null && args.length > 0 ){
try{
port = Integer.parseInt( args[0] ) ;
} catch( Exception e){ /* just use the default */ }
}
- if(args != null || args.length >= 1 ){
+ if(args != null && args.length > 1 ){
root = args[1];
}
Modified: java/start.sh
===================================================================
--- java/start.sh 2010-04-08 07:49:53 UTC (rev 302)
+++ java/start.sh 2010-04-08 19:21:05 UTC (rev 303)
@@ -1,4 +1,4 @@
#!/bin/bash
-java -cp lib/protobuf-java-2.2.0.jar:jar/RProtoBuf-java-http.jar org.rproject.rprotobuf.ProtobufHttpServer
+java -cp lib/protobuf-java-2.3.0.jar:jar/RProtoBuf-java-http.jar org.rproject.rprotobuf.ProtobufHttpServer 9002
Added: old/rpc_over_http.cpp
===================================================================
--- old/rpc_over_http.cpp (rev 0)
+++ old/rpc_over_http.cpp 2010-04-08 19:21:05 UTC (rev 303)
@@ -0,0 +1,238 @@
+#include "rprotobuf.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include "sisocks.h"
+
+/* FIXME: this should be probably handled by sisocks
+ we need it for the TCP_NODELAY socket option */
+#include <netinet/tcp.h>
+
+#define RESET(MSG) closesocket(socket_id); \
+ socket_id = - 1 ; \
+ THROW_SOCKET_ERROR( "socket error" ) ;
+
+#define LINE_BUF_SIZE 1024
+
+/* debug output - change the DBG(X) X to enable debugging output */
+#define DBG(X) X
+
+
+namespace rprotobuf{
+
+ /**
+ * invoke an rpc method over http
+ */
+ SEXP invoke_method_http( SEXP method_xp, SEXP message_xp, SEXP host_xp, SEXP port_xp, SEXP root_xp ){
+
+ using namespace Rcpp ;
+
+ /* grab parameters */
+ XPtr<GPB::MethodDescriptor> method(method_xp) ;
+ XPtr<GPB::Message> input(message_xp) ;
+
+ int port = as<int>(port_xp);
+ std::string host = as<std::string>(host_xp) ;
+ std::string root = as<std::string>(root_xp) ;
+
+ /* setup the socket */
+ int socket_id = - 1 ;
+ SAIN sa ;
+ int opt = 1;
+ socket_id = socket( PF_INET, SOCK_STREAM, 0 ) ;
+ memset(&sa,0,sizeof(SAIN));
+ sa.sin_family=AF_INET;
+ sa.sin_port=htons(port);
+ sa.sin_addr.s_addr=(host.c_str()) ? inet_addr(host.c_str()) : htonl(INADDR_ANY);
+
+ if( setsockopt( socket_id , IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) ){
+ RESET( "setting socket options") ;
+ }
+
+ int res = connect(socket_id, (SA*)&sa, sizeof(sa));
+ if( res==-1) {
+ RESET( "cannot connect" ) ;
+ }
+
+ /* building the HTTP request header */
+ char buf[33] ;
+ int input_size = input->ByteSize() ;
+ sprintf( buf, "%d", input_size ) ;
+ DBG(Rprintf( "input message size: %d\n", input_size )) ;
+
+ std::string header = "POST " ;
+ header += root ; /* we know root starts and ends with a / */
+ header += method->service()->full_name();
+ header += "?method=" ;
+ header += method->name() ;
+ header += " HTTP/1.1\r\nConnection: close\r\nHost: 127.0.0.1:9000\r\nContent-Type:application/x-protobuf\r\nContent-Length:" ;
+ header += buf ;
+ header += "\r\n" ;
+
+ /* send the header */
+ DBG(Rprintf( "sending http request\n" )) ;
+ int sent = 0 ;
+ int total_sent = 0;
+ int total = header.size() + 1 ;
+ const char* c_str = header.c_str() ;
+ char* p = (char*)c_str ;
+ while( total_sent < total ){
+ sent = send( socket_id, p, (total-total_sent), 0 ) ;
+ total_sent = total_sent + sent ;
+ DBG(Rprintf( "headers : sent %d bytes\n", total_sent )) ;
+ DBG(Rprintf( "==========================\n" )) ;
+ DBG(Rprintf( "%s\n", header.c_str() )) ;
+ DBG(Rprintf( "==========================\n" )) ;
+ p = p + sent ;
+ }
+
+ DBG(Rprintf( "sending body\n" )) ;
+
+ /* send the message payload */
+ char payload[ input_size ] ;
+ p = payload ;
+ input->SerializeToArray( payload, input_size );
+ total_sent = 0;
+ while( total_sent < input_size ){
+ sent = send( socket_id, p, (input_size-total_sent), 0 ) ;
+ total_sent = total_sent + sent ;
+ DBG(Rprintf( "input message : sent %d bytes\n", total_sent ) );
+ p = p + sent ;
+ }
+
+ DBG(Rprintf( "done\n" )) ;
+
+ /* make sure this is enough to read all the headers */
+ char buffer[LINE_BUF_SIZE] ;
+ char* response_body ;
+ int content_length = 0 ;
+ int success = 0 ;
+
+ DBG(Rprintf( "recv\n" )) ;
+ int n = recv( socket_id, buffer, LINE_BUF_SIZE, 0 ) ;
+ DBG(Rprintf( "reading %d bytes\n", n )) ;
+
+ char* s = buffer ;
+ while( *s ){
+ if( s[0] == '\n' || ( s[0] == '\r' && s[1] == '\n' ) ){
+ /* single empty line : end of headers */
+ DBG(Rprintf( "end of request - moving to body\n" ) );
+
+ /* skip the (CR)LF */
+ if( s[0] == '\r' ) s++ ;
+ s++ ;
+
+ if( content_length ){
+ response_body = (char*)malloc( content_length ) ;
+ /* fill this */
+ int pos = 0 ;
+ char* p = response_body ;
+ while( pos<content_length ){
+ if( !s ) break ;
+ *p = *s ;
+ p++; s++; pos++;
+ }
+ while( pos < content_length ){
+ /* FIXME: maybe we can stream this directly in the message */
+ n = recv( socket_id, p , (content_length - pos) , 0 ) ;
+ DBG(Rprintf( "reading %d bytes\n", n ) );
+ if( n < 1){
+ RESET( "reading the body" ) ;
+ }
+ pos += n ;
+ }
+ closesocket( socket_id ) ;
+ socket_id = -1 ;
+
+ /* the body was read in full, we can now fill the
+ response message */
+ GPB::Message* result = PROTOTYPE( method->output_type() ) ;
+ result->ParsePartialFromArray( response_body, content_length ) ;
+ return( new_RS4_Message_( result ) ) ;
+
+ } else {
+ Rf_error( "need Content-Length header" ) ;
+ }
+
+ }
+
+ /* read one line of header */
+ char* bol = s; /* beginning of line */
+ while (*s && *s != '\r' && *s != '\n') s++;
+ if (!*s) {
+ /* incomplete line - this must mean the request
+ was too large for the buffer, just generate an error
+ for now, fix later */
+ Rf_error( "response headers too large" ) ;
+ } else{
+ /* complete header line, parse it */
+ if (*s == '\r') *(s++) = 0;
+ if (*s == '\n') *(s++) = 0;
+
+ if( !success ){
+
+ unsigned int rll = strlen(bol); /* request line length */
+ if( rll < 9) {
+ Rf_error("response line too short : {%s}", bol ) ;
+ }
+
+ /* we have not parsed the first line of the
+ headers yet */
+ if( strncmp( bol, "HTTP/", 5 ) ){
+ Rf_error( "wrong protocol, expecting HTTP {%s}", bol ) ;
+ }
+
+ if( !( strncmp( bol+5, "1.0 ", 4) || strncmp(bol+5, "1.1 ", 4) )){
+ Rf_error( "HTTP version should be 1.0 or 1.1 : {%s}", bol ) ;
+ }
+
+ if( strncmp( bol+9, "200 OK", 6) ){
+ Rf_error( "invocation error : {%s}", bol+9 ) ;
+ }
+
+ /* otherwise, things are ok */
+ success = 1 ;
+ }
+
+ DBG(Rprintf("complete line: {%s}\n", bol)) ;
+
+ /* lower case before the : */
+ char *k = bol;
+ while (*k && *k != ':') {
+ if (*k >= 'A' && *k <= 'Z')
+ *k |= 0x20;
+ k++;
+ }
+
+ if (*k == ':') {
+ *(k++) = 0;
+ while (*k == ' ' || *k == '\t') k++;
+ printf("header '%s' => '%s'\n", bol, k);
+ if (!strcmp(bol, "content-length")) {
+ content_length = atoi(k);
+ }
+ }
+
+ /* TODO: check if result ok */
+
+ }
+
+ }
+
+ /* we only arrive here if something went wrong */
+
+ closesocket(socket_id);
+ socket_id = - 1 ;
+
+ return R_NilValue ;
+ }
+
+#undef DBG
+#undef RESET
+
+} // namespace rprotobuf
+
+
+
+
Modified: pkg/DESCRIPTION
===================================================================
--- pkg/DESCRIPTION 2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/DESCRIPTION 2010-04-08 19:21:05 UTC (rev 303)
@@ -2,12 +2,12 @@
Version: 0.1-2
Date: $Date$
Author: Romain Francois <romain at r-enthusiasts.com> and Dirk Eddelbuettel <edd at debian.org>
-Maintainer: Romain Francois <romain at r-enthusiasts.com>
+Maintainer: Romain and Dirk <RomainAndDirk at r-enthusiasts.com>
Title: R Interface to the Protocol Buffers API
Description: Protocol Buffers are a way of encoding structured data in an
efficient yet extensible format. Google uses Protocol Buffers for almost all
of its internal RPC protocols and file formats.
-Depends: R (>= 2.10.0), Rcpp (>= 0.7.11.3), methods
+Depends: R (>= 2.10.0), RCurl, Rcpp (>= 0.7.11.3), methods
LinkingTo: Rcpp
Suggests: RUnit, highlight
SystemRequirements: Protocol Buffer compiler (to create C++ header and source files
Modified: pkg/R/rpc.R
===================================================================
--- pkg/R/rpc.R 2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/R/rpc.R 2010-04-08 19:21:05 UTC (rev 303)
@@ -33,7 +33,7 @@
impl <- get( method at name, envir = IMPLEMENTATIONS )
check_valid_implementation( method, impl )
result <- impl( message )
- valid <- .Call( "valid_output_message", method at pointer, message at pointer,
+ valid <- .Call( "valid_output_message", method at pointer, result at pointer,
PACKAGE = "RProtoBuf" )
if( !valid ){
stop( "invalid method output" )
@@ -56,8 +56,18 @@
if( ! grepl( "^/", root ) ) root <- paste( "/", root, sep = "" )
if( ! grepl( "/$", root ) ) root <- paste( root, "/", sep = "" )
- .Call( "invoke_method_http", method at pointer, message at pointer,
- protocol at host, protocol at port, protocol at root, PACKAGE = "RProtoBuf" )
-
+ url <- sprintf( "http://%s:%d%s?service=%s&method=%s",
+ protocol at host, protocol at port, root, method at service, method$name() )
+
+ reader <- basicTextGatherer()
+ header <- c( "Content-Type" = "application/x-protobuf",
+ "Accept" = "application/x-protobuf" )
+ curlPerform(
+ url = url,
+ httpheader = header,
+ postfields = serialize(message, NULL) ,
+ writefunction = reader$update )
+ response <- charToRaw( reader$value() )
+ read( method$output_type(), response )
} )
Deleted: pkg/src/rpc_over_http.cpp
===================================================================
--- pkg/src/rpc_over_http.cpp 2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/src/rpc_over_http.cpp 2010-04-08 19:21:05 UTC (rev 303)
@@ -1,226 +0,0 @@
-#include "rprotobuf.h"
-
-#include <sys/types.h>
-#include <sys/socket.h>
-
-#include "sisocks.h"
-
-/* FIXME: this should be probably handled by sisocks
- we need it for the TCP_NODELAY socket option */
-#include <netinet/tcp.h>
-
-#define RESET(MSG) closesocket(socket_id); \
- socket_id = - 1 ; \
- THROW_SOCKET_ERROR( "socket error" ) ;
-
-#define LINE_BUF_SIZE 1024
-
-/* debug output - change the DBG(X) X to enable debugging output */
-#define DBG(X) X
-
-
-namespace rprotobuf{
-
- /**
- * invoke an rpc method over http
- */
- SEXP invoke_method_http( SEXP method_xp, SEXP message_xp, SEXP host_xp, SEXP port_xp, SEXP root_xp ){
-
- /* grab parameters */
- GPB::MethodDescriptor* method = (GPB::MethodDescriptor*)EXTPTR_PTR(method_xp) ;
- GPB::Message* input = (GPB::Message*)EXTPTR_PTR( message_xp);
- int port = INTEGER(port_xp)[0];
- const char* host = CHAR( STRING_ELT( host_xp, 0) ) ;
- const char* root = CHAR( STRING_ELT( root_xp, 0) ) ;
-
- /* setup the socket */
- int socket_id = - 1 ;
- SAIN sa ;
- int opt = 1;
- socket_id = socket( PF_INET, SOCK_STREAM, 0 ) ;
- memset(&sa,0,sizeof(SAIN));
- sa.sin_family=AF_INET;
- sa.sin_port=htons(port);
- sa.sin_addr.s_addr=(host)?inet_addr(host):htonl(INADDR_ANY);
-
- if( setsockopt( socket_id , IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) ){
- RESET( "setting socket options") ;
- }
-
- int res = connect(socket_id, (SA*)&sa, sizeof(sa));
- if( res==-1) {
- RESET( "cannot connect" ) ;
- }
-
- /* building the HTTP request header */
- char buf[33] ;
- int input_size = input->ByteSize() ;
- sprintf( buf, "%d", input_size ) ;
-
- std::string header = "POST " ;
- header += root ; /* we know root starts and ends with a / */
- header += method->service()->full_name();
- header += "?method=" ;
- header += method->name() ;
- header += " HTTP/1.0\r\nConnection: close\r\nHost:\r\nContent-Type:application/x-protobuf\r\nContent-Length: " ;
- header += buf ;
- header += "\r\n\r\n" ;
-
- /* send the header */
- DBG(Rprintf( "sending http request\n" )) ;
- int sent = 0 ;
- int total_sent = 0;
- int total = header.length() ;
- const char* c_str = header.c_str() ;
- char* p = (char*)c_str ;
- while( total_sent < total ){
- sent = send( socket_id, p, (total-total_sent), 0 ) ;
- total_sent = total_sent + sent ;
- DBG(Rprintf( "headers : sent %d bytes\n", total_sent )) ;
- DBG(Rprintf( "%s\n", header.c_str() )) ;
- p = p + sent ;
- }
-
- /* send the message payload */
- char payload[ input_size ] ;
- p = payload ;
- input->SerializeToArray( payload, input_size );
- total_sent = 0;
- while( total_sent < input_size ){
- sent = send( socket_id, p, (input_size-total_sent), 0 ) ;
- total_sent = total_sent + sent ;
- DBG(Rprintf( "input message : sent %d bytes\n", total_sent ) );
- p = p + sent ;
- }
-
- /* make sure this is enough to read all the headers */
- char buffer[LINE_BUF_SIZE] ;
- char* response_body ;
- int content_length = 0 ;
- int success = 0 ;
-
- int n = recv( socket_id, buffer, LINE_BUF_SIZE, 0 ) ;
- DBG(Rprintf( "reading %d bytes\n", n )) ;
-
- char* s = buffer ;
- while( *s ){
- if( s[0] == '\n' || ( s[0] == '\r' && s[1] == '\n' ) ){
- /* single empty line : end of headers */
- DBG(Rprintf( "end of request - moving to body\n" ) );
-
- /* skip the (CR)LF */
- if( s[0] == '\r' ) s++ ;
- s++ ;
-
- if( content_length ){
- response_body = (char*)malloc( content_length ) ;
- /* fill this */
- int pos = 0 ;
- char* p = response_body ;
- while( pos<content_length ){
- if( !s ) break ;
- *p = *s ;
- p++; s++; pos++;
- }
- while( pos < content_length ){
- /* FIXME: maybe we can stream this directly in the message */
- n = recv( socket_id, p , (content_length - pos) , 0 ) ;
- DBG(Rprintf( "reading %d bytes\n", n ) );
- if( n < 1){
- RESET( "reading the body" ) ;
- }
- pos += n ;
- }
- closesocket( socket_id ) ;
- socket_id = -1 ;
-
- /* the body was read in full, we can now fill the
- response message */
- GPB::Message* result = PROTOTYPE( method->output_type() ) ;
- result->ParsePartialFromArray( response_body, content_length ) ;
- return( new_RS4_Message_( result ) ) ;
-
- } else {
- Rf_error( "need Content-Length header" ) ;
- }
-
- }
-
- /* read one line of header */
- char* bol = s; /* beginning of line */
- while (*s && *s != '\r' && *s != '\n') s++;
- if (!*s) {
- /* incomplete line - this must mean the request
- was too large for the buffer, just generate an error
- for now, fix later */
- Rf_error( "response headers too large" ) ;
- } else{
- /* complete header line, parse it */
- if (*s == '\r') *(s++) = 0;
- if (*s == '\n') *(s++) = 0;
-
- if( !success ){
-
- unsigned int rll = strlen(bol); /* request line length */
- if( rll < 9) {
- Rf_error("response line too short : {%s}", bol ) ;
- }
-
- /* we have not parsed the first line of the
- headers yet */
- if( strncmp( bol, "HTTP/", 5 ) ){
- Rf_error( "wrong protocol, expecting HTTP {%s}", bol ) ;
- }
-
- if( !( strncmp( bol+5, "1.0 ", 4) || strncmp(bol+5, "1.1 ", 4) )){
- Rf_error( "HTTP version should be 1.0 or 1.1 : {%s}", bol ) ;
- }
-
- if( strncmp( bol+9, "200 OK", 6) ){
- Rf_error( "invocation error : {%s}", bol+9 ) ;
- }
-
- /* otherwise, things are ok */
- success = 1 ;
- }
-
- DBG(Rprintf("complete line: {%s}\n", bol)) ;
-
- /* lower case before the : */
- char *k = bol;
- while (*k && *k != ':') {
- if (*k >= 'A' && *k <= 'Z')
- *k |= 0x20;
- k++;
- }
-
- if (*k == ':') {
- *(k++) = 0;
- while (*k == ' ' || *k == '\t') k++;
- printf("header '%s' => '%s'\n", bol, k);
- if (!strcmp(bol, "content-length")) {
- content_length = atoi(k);
- }
- }
-
- /* TODO: check if result ok */
-
- }
-
- }
-
- /* we only arrive here if something went wrong */
-
- closesocket(socket_id);
- socket_id = - 1 ;
-
- return R_NilValue ;
- }
-
-
-
-} // namespace rprotobuf
-
-
-
-
Modified: pkg/src/rprotobuf.h
===================================================================
--- pkg/src/rprotobuf.h 2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/src/rprotobuf.h 2010-04-08 19:21:05 UTC (rev 303)
@@ -322,9 +322,6 @@
RcppExport SEXP ServiceDescriptor_getMethodByIndex(SEXP, SEXP) ;
RcppExport SEXP ServiceDescriptor_getMethodByName(SEXP, SEXP) ;
-/* in rpc_over_http.cpp */
-RcppExport SEXP invoke_method_http( SEXP, SEXP, SEXP, SEXP, SEXP) ;
-
/* in streams.cpp */
void ZeroCopyInputStreamWrapper_finalizer( SEXP );
void ZeroCopyOutputStreamWrapper_finalizer( SEXP );
More information about the Rprotobuf-commits
mailing list