[Rprotobuf-commits] r303 - java java/src/org/rproject/rprotobuf old pkg pkg/R pkg/src

noreply at r-forge.r-project.org noreply at r-forge.r-project.org
Thu Apr 8 21:21:05 CEST 2010


Author: romain
Date: 2010-04-08 21:21:05 +0200 (Thu, 08 Apr 2010)
New Revision: 303

Added:
   old/rpc_over_http.cpp
Removed:
   pkg/src/rpc_over_http.cpp
Modified:
   java/build.xml
   java/src/org/rproject/rprotobuf/ProtobufHttpServer.java
   java/start.sh
   pkg/DESCRIPTION
   pkg/R/rpc.R
   pkg/src/rprotobuf.h
Log:
using RCurl for rpc over http

Modified: java/build.xml
===================================================================
--- java/build.xml	2010-04-08 07:49:53 UTC (rev 302)
+++ java/build.xml	2010-04-08 19:21:05 UTC (rev 303)
@@ -7,7 +7,7 @@
 	<property name="jar.dir" value="jar" />
 	<property name="lib.dir" value="lib" />
 	
-	<property name="protobuf.jar" value="${lib.dir}/protobuf-java-2.2.0.jar" />
+	<property name="protobuf.jar" value="${lib.dir}/protobuf-java-2.3.0.jar" />
 	<property name="generated.dir" value="generated" />
 	
 	<path id="classpath">

Modified: java/src/org/rproject/rprotobuf/ProtobufHttpServer.java
===================================================================
--- java/src/org/rproject/rprotobuf/ProtobufHttpServer.java	2010-04-08 07:49:53 UTC (rev 302)
+++ java/src/org/rproject/rprotobuf/ProtobufHttpServer.java	2010-04-08 19:21:05 UTC (rev 303)
@@ -35,13 +35,13 @@
     int port = DEFAULT_PORT ;
     String root = DEFAULT_ROOT ;
     
-		if( args != null && args.length >= 0 ){
+		if( args != null && args.length > 0 ){
 			try{
 				port = Integer.parseInt( args[0] ) ;
 			} catch( Exception e){ /* just use the default */ }
 		}
 		
-		if(args != null || args.length >= 1 ){
+		if(args != null && args.length > 1 ){
 			root = args[1]; 
 		}
 		

Modified: java/start.sh
===================================================================
--- java/start.sh	2010-04-08 07:49:53 UTC (rev 302)
+++ java/start.sh	2010-04-08 19:21:05 UTC (rev 303)
@@ -1,4 +1,4 @@
 #!/bin/bash
 
-java -cp lib/protobuf-java-2.2.0.jar:jar/RProtoBuf-java-http.jar org.rproject.rprotobuf.ProtobufHttpServer
+java -cp lib/protobuf-java-2.3.0.jar:jar/RProtoBuf-java-http.jar org.rproject.rprotobuf.ProtobufHttpServer 9002
 

Added: old/rpc_over_http.cpp
===================================================================
--- old/rpc_over_http.cpp	                        (rev 0)
+++ old/rpc_over_http.cpp	2010-04-08 19:21:05 UTC (rev 303)
@@ -0,0 +1,238 @@
+#include "rprotobuf.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include "sisocks.h"
+
+/* FIXME: this should be probably handled by sisocks
+          we need it for the TCP_NODELAY socket option */
+#include <netinet/tcp.h>
+
+#define RESET(MSG) closesocket(socket_id); \
+	socket_id = - 1 ; \
+	THROW_SOCKET_ERROR( "socket error" ) ; 
+
+#define LINE_BUF_SIZE 1024
+
+/* debug output - change the DBG(X) X to enable debugging output */
+#define DBG(X) X
+
+
+namespace rprotobuf{
+	
+	/**
+	 * invoke an rpc method over http
+	 */
+	SEXP invoke_method_http( SEXP method_xp, SEXP message_xp, SEXP host_xp, SEXP port_xp, SEXP root_xp ){
+		
+		using namespace Rcpp ;
+		
+		/* grab parameters */
+		XPtr<GPB::MethodDescriptor> method(method_xp) ;
+		XPtr<GPB::Message> input(message_xp) ;
+		
+		int port = as<int>(port_xp); 
+		std::string host = as<std::string>(host_xp) ;
+		std::string root = as<std::string>(root_xp) ;
+		
+		/* setup the socket */
+		int socket_id = - 1 ;
+		SAIN sa ;
+		int opt = 1; 
+		socket_id = socket( PF_INET, SOCK_STREAM, 0 ) ;
+		memset(&sa,0,sizeof(SAIN));
+	  	sa.sin_family=AF_INET;
+	  	sa.sin_port=htons(port);
+	  	sa.sin_addr.s_addr=(host.c_str()) ? inet_addr(host.c_str()) : htonl(INADDR_ANY);
+	  	
+		if( setsockopt( socket_id , IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) ){
+			RESET( "setting socket options") ;
+		}
+		
+		int res = connect(socket_id, (SA*)&sa, sizeof(sa)); 
+		if( res==-1) {
+			RESET( "cannot connect" ) ; 
+		}
+		
+		/* building the HTTP request header */
+		char buf[33] ;
+		int input_size = input->ByteSize() ;
+		sprintf( buf, "%d", input_size ) ;
+		DBG(Rprintf( "input message size: %d\n", input_size )) ;  
+		
+		std::string header = "POST " ;
+		header += root ; /* we know root starts and ends with a / */
+		header += method->service()->full_name();
+		header += "?method=" ;
+		header += method->name() ;
+		header += " HTTP/1.1\r\nConnection: close\r\nHost: 127.0.0.1:9000\r\nContent-Type:application/x-protobuf\r\nContent-Length:" ;
+		header += buf ;
+		header += "\r\n" ;
+		
+		/* send the header */
+		DBG(Rprintf( "sending http request\n" )) ;  
+		int sent = 0 ;
+		int total_sent = 0;
+		int total = header.size() + 1 ;
+		const char* c_str = header.c_str() ;
+		char* p = (char*)c_str ;
+		while( total_sent < total ){
+			sent = send( socket_id, p, (total-total_sent), 0 ) ;
+			total_sent = total_sent + sent ;
+			DBG(Rprintf( "headers : sent %d bytes\n", total_sent )) ;  
+			DBG(Rprintf( "==========================\n" )) ;  
+			DBG(Rprintf( "%s\n", header.c_str() )) ;
+			DBG(Rprintf( "==========================\n" )) ;  
+			p = p + sent ;
+		}
+		
+		DBG(Rprintf( "sending body\n" )) ;  
+		
+		/* send the message payload */
+		char payload[ input_size ] ;
+		p = payload ;
+		input->SerializeToArray( payload, input_size );
+		total_sent = 0;
+		while( total_sent < input_size ){
+			sent = send( socket_id, p, (input_size-total_sent), 0 ) ;
+			total_sent = total_sent + sent ;
+			DBG(Rprintf( "input message : sent %d bytes\n", total_sent ) );
+			p = p + sent ;
+		}
+		
+		DBG(Rprintf( "done\n" )) ;  
+		
+		/* make sure this is enough to read all the headers */
+		char buffer[LINE_BUF_SIZE] ;
+		char* response_body ;
+		int content_length = 0 ;
+		int success = 0 ;
+		
+		DBG(Rprintf( "recv\n" )) ;  
+		int n = recv( socket_id, buffer, LINE_BUF_SIZE, 0 ) ;
+		DBG(Rprintf( "reading %d bytes\n", n )) ;
+		
+		char* s = buffer ;
+		while( *s ){
+			if( s[0] == '\n' || ( s[0] == '\r' && s[1] == '\n' ) ){
+				/* single empty line : end of headers */
+				DBG(Rprintf( "end of request - moving to body\n" ) );
+				
+				/* skip the (CR)LF */
+				if( s[0] == '\r' ) s++ ; 
+				s++ ;
+				
+				if( content_length ){
+					response_body = (char*)malloc( content_length ) ;
+					/* fill this */
+					int pos = 0 ;
+					char* p = response_body ;
+					while( pos<content_length ){
+						if( !s ) break ;
+						*p = *s ;
+						p++; s++; pos++;
+					}
+					while( pos < content_length ){
+						/* FIXME: maybe we can stream this directly in the message */
+						n = recv( socket_id, p , (content_length - pos) , 0 ) ;
+						DBG(Rprintf( "reading %d bytes\n", n ) );
+						if( n < 1){
+							RESET( "reading the body" ) ; 
+						}
+						pos += n ;
+					}
+					closesocket( socket_id ) ;
+					socket_id = -1 ;
+					
+					/* the body was read in full, we can now fill the 
+					   response message */
+					GPB::Message* result = PROTOTYPE( method->output_type() ) ;
+					result->ParsePartialFromArray( response_body, content_length ) ;
+					return( new_RS4_Message_( result ) ) ;
+					
+				} else {
+					Rf_error( "need Content-Length header" ) ; 
+				}
+				
+			} 
+			
+			/* read one line of header */
+			char* bol = s; /* beginning of line */
+			while (*s && *s != '\r' && *s != '\n') s++;
+			if (!*s) { 
+				/* incomplete line - this must mean the request 
+				   was too large for the buffer, just generate an error 
+				   for now, fix later */
+				Rf_error( "response headers too large" ) ; 
+			} else{
+				/* complete header line, parse it */
+				if (*s == '\r') *(s++) = 0;
+				if (*s == '\n') *(s++) = 0;
+				
+				if( !success ){
+					
+					unsigned int rll = strlen(bol); /* request line length */
+					if( rll < 9) {
+						Rf_error("response line too short : {%s}", bol ) ; 
+					}
+					
+					/* we have not parsed the first line of the 
+					   headers yet */
+					if( strncmp( bol, "HTTP/", 5 ) ){
+						Rf_error( "wrong protocol, expecting HTTP {%s}", bol ) ; 
+					}
+					
+					if( !( strncmp( bol+5, "1.0 ", 4) || strncmp(bol+5, "1.1 ", 4) )){
+						Rf_error( "HTTP version should be 1.0 or 1.1 : {%s}", bol ) ; 
+					}
+					
+					if( strncmp( bol+9, "200 OK", 6) ){
+						Rf_error( "invocation error : {%s}", bol+9 ) ;
+					}
+					
+					/* otherwise, things are ok */
+					success = 1 ;
+				}
+				
+				DBG(Rprintf("complete line: {%s}\n", bol)) ;
+				
+				/* lower case before the : */
+				char *k = bol;
+				while (*k && *k != ':') {
+				    if (*k >= 'A' && *k <= 'Z')
+					*k |= 0x20;
+				    k++;
+				}
+				
+				if (*k == ':') {
+				    *(k++) = 0;
+				    while (*k == ' ' || *k == '\t') k++;
+				    printf("header '%s' => '%s'\n", bol, k);
+				    if (!strcmp(bol, "content-length")) {
+						content_length = atoi(k);
+				    }
+				}
+				
+				/* TODO: check if result ok */
+				
+			}
+			
+		}
+		
+		/* we only arrive here if something went wrong */
+		
+		closesocket(socket_id);
+		socket_id = - 1 ;
+		
+		return R_NilValue ;
+	}
+	
+#undef DBG
+#undef RESET
+	
+} // namespace rprotobuf
+
+
+
+

Modified: pkg/DESCRIPTION
===================================================================
--- pkg/DESCRIPTION	2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/DESCRIPTION	2010-04-08 19:21:05 UTC (rev 303)
@@ -2,12 +2,12 @@
 Version: 0.1-2
 Date: $Date$
 Author: Romain Francois <romain at r-enthusiasts.com> and Dirk Eddelbuettel <edd at debian.org>
-Maintainer: Romain Francois <romain at r-enthusiasts.com>
+Maintainer: Romain and Dirk <RomainAndDirk at r-enthusiasts.com>
 Title: R Interface to the Protocol Buffers API
 Description: Protocol Buffers are a way of encoding structured data in an
  efficient yet extensible format. Google uses Protocol Buffers for almost all
  of its internal RPC protocols and file formats.  
-Depends: R (>= 2.10.0), Rcpp (>= 0.7.11.3), methods
+Depends: R (>= 2.10.0), RCurl, Rcpp (>= 0.7.11.3), methods
 LinkingTo: Rcpp
 Suggests: RUnit, highlight
 SystemRequirements: Protocol Buffer compiler (to create C++ header and source files

Modified: pkg/R/rpc.R
===================================================================
--- pkg/R/rpc.R	2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/R/rpc.R	2010-04-08 19:21:05 UTC (rev 303)
@@ -33,7 +33,7 @@
 		impl <- get( method at name, envir = IMPLEMENTATIONS )
 		check_valid_implementation( method, impl )
 		result <- impl( message )
-		valid <- .Call( "valid_output_message", method at pointer, message at pointer, 
+		valid <- .Call( "valid_output_message", method at pointer, result at pointer, 
 			PACKAGE = "RProtoBuf" )
 		if( !valid ){
 			stop( "invalid method output" )
@@ -56,8 +56,18 @@
 	if( ! grepl( "^/", root ) ) root <- paste( "/", root, sep = "" )
 	if( ! grepl( "/$", root ) ) root <- paste( root, "/", sep = "" )
 	
-	.Call( "invoke_method_http", method at pointer, message at pointer, 
-		protocol at host, protocol at port, protocol at root, PACKAGE = "RProtoBuf" )
-	
+	url <- sprintf( "http://%s:%d%s?service=%s&method=%s", 
+		protocol at host, protocol at port, root, method at service, method$name() )
+	    
+	reader <- basicTextGatherer()
+	header <- c( "Content-Type" = "application/x-protobuf", 
+		"Accept" = "application/x-protobuf" )
+	curlPerform( 
+		url = url, 
+		httpheader = header, 
+		postfields = serialize(message, NULL) , 
+		writefunction = reader$update )
+	response <- charToRaw( reader$value() )
+	read( method$output_type(), response )	
 } )
 

Deleted: pkg/src/rpc_over_http.cpp
===================================================================
--- pkg/src/rpc_over_http.cpp	2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/src/rpc_over_http.cpp	2010-04-08 19:21:05 UTC (rev 303)
@@ -1,226 +0,0 @@
-#include "rprotobuf.h"
-
-#include <sys/types.h>
-#include <sys/socket.h>
-
-#include "sisocks.h"
-
-/* FIXME: this should be probably handled by sisocks
-          we need it for the TCP_NODELAY socket option */
-#include <netinet/tcp.h>
-
-#define RESET(MSG) closesocket(socket_id); \
-	socket_id = - 1 ; \
-	THROW_SOCKET_ERROR( "socket error" ) ; 
-
-#define LINE_BUF_SIZE 1024
-
-/* debug output - change the DBG(X) X to enable debugging output */
-#define DBG(X) X
-
-
-namespace rprotobuf{
-	
-	/**
-	 * invoke an rpc method over http
-	 */
-	SEXP invoke_method_http( SEXP method_xp, SEXP message_xp, SEXP host_xp, SEXP port_xp, SEXP root_xp ){
-		
-		/* grab parameters */
-		GPB::MethodDescriptor* method = (GPB::MethodDescriptor*)EXTPTR_PTR(method_xp) ;
-		GPB::Message* input = (GPB::Message*)EXTPTR_PTR( message_xp); 
-		int port = INTEGER(port_xp)[0]; 
-		const char* host = CHAR( STRING_ELT( host_xp, 0) ) ;
-		const char* root = CHAR( STRING_ELT( root_xp, 0) ) ;
-		
-		/* setup the socket */
-		int socket_id = - 1 ;
-		SAIN sa ;
-		int opt = 1; 
-		socket_id = socket( PF_INET, SOCK_STREAM, 0 ) ;
-		memset(&sa,0,sizeof(SAIN));
-	  	sa.sin_family=AF_INET;
-	  	sa.sin_port=htons(port);
-	  	sa.sin_addr.s_addr=(host)?inet_addr(host):htonl(INADDR_ANY);
-	  	
-		if( setsockopt( socket_id , IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) ){
-			RESET( "setting socket options") ;
-		}
-		
-		int res = connect(socket_id, (SA*)&sa, sizeof(sa)); 
-		if( res==-1) {
-			RESET( "cannot connect" ) ; 
-		}
-		
-		/* building the HTTP request header */
-		char buf[33] ;
-		int input_size = input->ByteSize() ;
-		sprintf( buf, "%d", input_size ) ;
-		
-		std::string header = "POST " ;
-		header += root ; /* we know root starts and ends with a / */
-		header += method->service()->full_name();
-		header += "?method=" ;
-		header += method->name() ;
-		header += " HTTP/1.0\r\nConnection: close\r\nHost:\r\nContent-Type:application/x-protobuf\r\nContent-Length: " ;
-		header += buf ;
-		header += "\r\n\r\n" ;
-		
-		/* send the header */
-		DBG(Rprintf( "sending http request\n" )) ;  
-		int sent = 0 ;
-		int total_sent = 0;
-		int total = header.length() ;
-		const char* c_str = header.c_str() ;
-		char* p = (char*)c_str ;
-		while( total_sent < total ){
-			sent = send( socket_id, p, (total-total_sent), 0 ) ;
-			total_sent = total_sent + sent ;
-			DBG(Rprintf( "headers : sent %d bytes\n", total_sent )) ;  
-			DBG(Rprintf( "%s\n", header.c_str() )) ;  
-			p = p + sent ;
-		}
-		
-		/* send the message payload */
-		char payload[ input_size ] ;
-		p = payload ;
-		input->SerializeToArray( payload, input_size );
-		total_sent = 0;
-		while( total_sent < input_size ){
-			sent = send( socket_id, p, (input_size-total_sent), 0 ) ;
-			total_sent = total_sent + sent ;
-			DBG(Rprintf( "input message : sent %d bytes\n", total_sent ) );  
-			p = p + sent ;
-		}
-		
-		/* make sure this is enough to read all the headers */
-		char buffer[LINE_BUF_SIZE] ;
-		char* response_body ;
-		int content_length = 0 ;
-		int success = 0 ;
-		
-		int n = recv( socket_id, buffer, LINE_BUF_SIZE, 0 ) ;
-		DBG(Rprintf( "reading %d bytes\n", n )) ;
-		
-		char* s = buffer ;
-		while( *s ){
-			if( s[0] == '\n' || ( s[0] == '\r' && s[1] == '\n' ) ){
-				/* single empty line : end of headers */
-				DBG(Rprintf( "end of request - moving to body\n" ) );
-				
-				/* skip the (CR)LF */
-				if( s[0] == '\r' ) s++ ; 
-				s++ ;
-				
-				if( content_length ){
-					response_body = (char*)malloc( content_length ) ;
-					/* fill this */
-					int pos = 0 ;
-					char* p = response_body ;
-					while( pos<content_length ){
-						if( !s ) break ;
-						*p = *s ;
-						p++; s++; pos++;
-					}
-					while( pos < content_length ){
-						/* FIXME: maybe we can stream this directly in the message */
-						n = recv( socket_id, p , (content_length - pos) , 0 ) ;
-						DBG(Rprintf( "reading %d bytes\n", n ) );
-						if( n < 1){
-							RESET( "reading the body" ) ; 
-						}
-						pos += n ;
-					}
-					closesocket( socket_id ) ;
-					socket_id = -1 ;
-					
-					/* the body was read in full, we can now fill the 
-					   response message */
-					GPB::Message* result = PROTOTYPE( method->output_type() ) ;
-					result->ParsePartialFromArray( response_body, content_length ) ;
-					return( new_RS4_Message_( result ) ) ;
-					
-				} else {
-					Rf_error( "need Content-Length header" ) ; 
-				}
-				
-			} 
-			
-			/* read one line of header */
-			char* bol = s; /* beginning of line */
-			while (*s && *s != '\r' && *s != '\n') s++;
-			if (!*s) { 
-				/* incomplete line - this must mean the request 
-				   was too large for the buffer, just generate an error 
-				   for now, fix later */
-				Rf_error( "response headers too large" ) ; 
-			} else{
-				/* complete header line, parse it */
-				if (*s == '\r') *(s++) = 0;
-				if (*s == '\n') *(s++) = 0;
-				
-				if( !success ){
-					
-					unsigned int rll = strlen(bol); /* request line length */
-					if( rll < 9) {
-						Rf_error("response line too short : {%s}", bol ) ; 
-					}
-					
-					/* we have not parsed the first line of the 
-					   headers yet */
-					if( strncmp( bol, "HTTP/", 5 ) ){
-						Rf_error( "wrong protocol, expecting HTTP {%s}", bol ) ; 
-					}
-					
-					if( !( strncmp( bol+5, "1.0 ", 4) || strncmp(bol+5, "1.1 ", 4) )){
-						Rf_error( "HTTP version should be 1.0 or 1.1 : {%s}", bol ) ; 
-					}
-					
-					if( strncmp( bol+9, "200 OK", 6) ){
-						Rf_error( "invocation error : {%s}", bol+9 ) ;
-					}
-					
-					/* otherwise, things are ok */
-					success = 1 ;
-				}
-				
-				DBG(Rprintf("complete line: {%s}\n", bol)) ;
-				
-				/* lower case before the : */
-				char *k = bol;
-				while (*k && *k != ':') {
-				    if (*k >= 'A' && *k <= 'Z')
-					*k |= 0x20;
-				    k++;
-				}
-				
-				if (*k == ':') {
-				    *(k++) = 0;
-				    while (*k == ' ' || *k == '\t') k++;
-				    printf("header '%s' => '%s'\n", bol, k);
-				    if (!strcmp(bol, "content-length")) {
-						content_length = atoi(k);
-				    }
-				}
-				
-				/* TODO: check if result ok */
-				
-			}
-			
-		}
-		
-		/* we only arrive here if something went wrong */
-		
-		closesocket(socket_id);
-		socket_id = - 1 ;
-		
-		return R_NilValue ;
-	}
-	
-	
-	
-} // namespace rprotobuf
-
-
-
-

Modified: pkg/src/rprotobuf.h
===================================================================
--- pkg/src/rprotobuf.h	2010-04-08 07:49:53 UTC (rev 302)
+++ pkg/src/rprotobuf.h	2010-04-08 19:21:05 UTC (rev 303)
@@ -322,9 +322,6 @@
 RcppExport SEXP ServiceDescriptor_getMethodByIndex(SEXP, SEXP) ;
 RcppExport SEXP ServiceDescriptor_getMethodByName(SEXP, SEXP) ;
 
-/* in rpc_over_http.cpp */
-RcppExport SEXP invoke_method_http( SEXP, SEXP, SEXP, SEXP, SEXP) ;
-
 /* in streams.cpp */
 void ZeroCopyInputStreamWrapper_finalizer( SEXP ); 
 void ZeroCopyOutputStreamWrapper_finalizer( SEXP ); 



More information about the Rprotobuf-commits mailing list