/* $Id: mq2raw.c,v 1.2 2023/02/23 06:17:41 urabe Exp $ */ /* "mq2raw.c" 2023.2.6 urabe */ /* modified from raw_ch.c */ /* 2023.2.6-2.13 mq2raw.c */ /* 2023.2.23 daemon mode */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include #include #include #include #include #include #include #include #if TIME_WITH_SYS_TIME #include #include #else /* !TIME_WITH_SYS_TIME */ #if HAVE_SYS_TIME_H #include #else /* !HAVE_SYS_TIME_H */ #include #endif /* !HAVE_SYS_TIME_H */ #endif /* !TIME_WITH_SYS_TIME */ #include "winlib.h" #include "daemon_mode.h" #define PORT 1883 #define SPORT 8883 #define CAFILE "/etc/ssl/cert.pem" #define CAPATH "/etc/ssl/certs" static const char rcsid[] = "$Id: mq2raw.c,v 1.2 2023/02/23 06:17:41 urabe Exp $"; char *progname,*logfile; static int daemon_mode; int syslog_mode,exit_status; struct Shm *shm; char *topic="win/data"; int debug=0; char tb[200]; struct mosquitto *mosq; int qos=0; time_t ltime_p; unsigned long n_packets,n_bytes; /* 64bit ok */ void report() { time_t ltime,j_timt,k_timt; time(<ime); j_timt=ltime-ltime_p; if(j_timt != 0) { k_timt=j_timt/2; if(ltime_p) { snprintf(tb,sizeof(tb), "%lu msgs %lu B in %ld s ( %ld msgs/s %ld B/s ) ", n_packets,n_bytes,(long)j_timt, (n_packets+k_timt)/j_timt,(n_bytes+k_timt)/j_timt); write_log(tb); n_packets=n_bytes=0; } ltime_p=ltime; } } void endprog() { mosquitto_destroy(mosq); mosquitto_lib_cleanup(); end_program(); } void usage(void) { WIN_version(); fprintf(stderr,"%s\n",rcsid); if(daemon_mode) fprintf(stderr, " usage : '%s (-gs) (-t [topic]) (-p [port]) (-q [qos]) (-i [id])\\\n\ (-c [cafile] / -d [capath]) (-C [certfile] -k [keyfile])\\\n\ (-u [user] -P [pswd])\\\n\ [host] [out_key] [shm_size(KB)] ([log file])'\n", progname); else fprintf(stderr, " usage : '%s (-gsD) (-t [topic]) (-p [port]) (-q [qos]) (-i [id])\\\n\ (-c [cafile] / -d [capath]) (-C [certfile] -k [keyfile])\\\n\ (-u [user] -P [pswd])\\\n\ [host] [out_key] [shm_size(KB)] ([log file])'\n", progname); } void on_connect(struct mosquitto *mosq,void *obj,int rc) { if(rc) { snprintf(tb,sizeof(tb),"%s",mosquitto_connack_string(rc)); write_log(tb); endprog(); } write_log("connected"); mosquitto_subscribe(mosq,NULL,topic,qos); } void on_disconnect(struct mosquitto *mosq,void *obj,int rc) { write_log("disconnected"); /* endprog();*/ } void on_message(struct mosquitto *mosq,void *obj,const struct mosquitto_message *m) { int i; uint8_w *ptw,*ptr; uint32_w uni; if(debug>0) { ptr=(unsigned char *)m->payload; for(i=0;i<16;i++) printf("%02X",ptr[i]); printf(" :%6d b rcvd topic=%s qos=%d mid=%d\n", m->payloadlen,m->topic,m->qos,m->mid); } n_packets++; n_bytes+=m->payloadlen; ptw=shm->d+shm->p; uni=m->payloadlen+12; /* size+tow+payload+size */ *ptw++=uni>>24; /* size (H) */ *ptw++=uni>>16; *ptw++=uni>>8; *ptw++=uni; /* size (L) */ uni=(uint32_w)(time(NULL)-TIME_OFFSET); *ptw++=uni>>24; /* tow (H) */ *ptw++=uni>>16; *ptw++=uni>>8; *ptw++=uni; /* tow (L) */ memcpy(ptw,m->payload,m->payloadlen); ptw+=m->payloadlen; uni=m->payloadlen+12; /* size+tow+payload+size */ *ptw++=uni>>24; /* size (H) */ *ptw++=uni>>16; *ptw++=uni>>8; *ptw++=uni; /* size (L) */ if(debug>1) { printf(" "); for(i=0;i<16;i++) printf("%02X",shm->d[shm->p+i]); printf(" :%6d b written\n",uni); } shm->r=shm->p; if(ptw>shm->d+(unsigned long)shm->pl) ptw=shm->d; shm->p=ptw-shm->d; shm->c++; } int main(int argc, char *argv[]) { key_t rawkey; int c,i,rc; size_t size_shm; uint16_t port=PORT; char *host=NULL,*id=NULL,*user=NULL,*pswd=NULL,*capath=NULL, *certfile=NULL,*keyfile=NULL,*cafile=NULL; int ssl=0; int portset=0; int ca_default=1; if((progname=strrchr(argv[0],'/')) != NULL) progname++; else progname=argv[0]; daemon_mode=syslog_mode=0; exit_status=EXIT_SUCCESS; if(strcmp(progname,"mq2rawd")==0) daemon_mode=1; while((c=getopt(argc,argv,"c:C:d:Dgi:k:p:P:q:st:u:"))!=-1) { switch(c) { case 'c': /* cafile */ cafile=optarg; ca_default=0; break; case 'C': /* certfile */ certfile=optarg; break; case 'd': /* capath */ capath=optarg; ca_default=0; break; case 'D': /* daemon mode */ daemon_mode = 1; break; case 'g': /* debug print */ debug++; break; case 'i': /* unique client id */ id=optarg; break; case 'k': /* keyfile */ keyfile=optarg; break; case 'p': /* port */ port=(uint16_t)atoi(optarg); portset=1; break; case 'P': /* password */ pswd=optarg; break; case 'q': /* qos */ qos=atoi(optarg); break; case 's': /* use ssl/tls */ ssl=1; break; case 't': /* topic */ topic=optarg; break; case 'u': /* username */ user=optarg; break; default: fprintf(stderr," option -%c unknown\n",c); usage(); exit(1); } } optind--; if(argc<4+optind) { usage(); exit(1); } host=argv[1+optind]; rawkey=atol(argv[2+optind]); size_shm=(size_t)atol(argv[3+optind])*1000; if(argc>4+optind) logfile=argv[4+optind]; else { logfile=NULL; if(daemon_mode) syslog_mode=1; } /* daemon mode */ if (daemon_mode) { daemon_init(progname, LOG_USER, syslog_mode); umask(022); } if(qos<0 || qos>2) { write_log("invalid QoS value"); exit(1); } if(ssl && !portset) port=SPORT; exit_status=0; n_packets=n_bytes=0; write_log("start"); mosquitto_lib_init(); if(id==NULL) mosq=mosquitto_new(NULL,1,NULL); else mosq=mosquitto_new(id,0,NULL); if(!mosq) { write_log("failed to create mosquitto"); endprog(); } mosquitto_username_pw_set(mosq,user,pswd); if(ssl) { if(ca_default) { rc=mosquitto_tls_set(mosq,cafile=CAFILE,capath,certfile,keyfile,NULL); if(rc!=MOSQ_ERR_SUCCESS) { rc=mosquitto_tls_set(mosq,cafile=NULL,capath=CAPATH,certfile,keyfile,NULL); if(rc!=MOSQ_ERR_SUCCESS) { snprintf(tb,sizeof(tb),"mosquitto_tls_set failed(%d)\n",rc); write_log(tb); endprog(); } } } else { rc=mosquitto_tls_set(mosq,cafile,capath,certfile,keyfile,NULL); if(rc!=MOSQ_ERR_SUCCESS) { snprintf(tb,sizeof(tb),"mosquitto_tls_set failed(%d)\n",rc); write_log(tb); endprog(); } } } if(ssl) snprintf(tb,sizeof(tb),"mqtts://%s:%s@%s:%d topic=%s qos=%d", user,pswd,host,port,topic,qos); else snprintf(tb,sizeof(tb),"mqtt://%s:%s@%s:%d topic=%s qos=%d", user,pswd,host,port,topic,qos); write_log(tb); if(ssl) { if(cafile) {snprintf(tb,sizeof(tb),"cafile=%s",cafile);write_log(tb);} if(capath) {snprintf(tb,sizeof(tb),"capath=%s",capath);write_log(tb);} if(certfile) {snprintf(tb,sizeof(tb),"certfile=%s",certfile);write_log(tb);} if(keyfile) {snprintf(tb,sizeof(tb),"keyfile=%s",keyfile);write_log(tb);} } if(id) {snprintf(tb,sizeof(tb),"id=%s",id);write_log(tb);} /* out shared memory */ shm=Shm_create(rawkey,size_shm,"out"); Shm_init(shm,size_shm); mosquitto_connect_callback_set(mosq,on_connect); mosquitto_disconnect_callback_set(mosq,on_disconnect); mosquitto_message_callback_set(mosq,on_message); if(rc=mosquitto_connect_bind(mosq,host,port,60,NULL)) { strerror_r(errno,tb,sizeof(tb)); write_log("failed to connect broker"); endprog(); } signal(SIGTERM,(void *)end_program); signal(SIGINT,(void *)end_program); signal(SIGHUP,(void *)report); time(<ime_p); rc=mosquitto_loop_forever(mosq,-1,1); endprog(); }