/* * Copyright (c) 2006-2017 Hypertriton, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE * USE OF THIS SOFTWARE EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include "mailprocd.h" #include "pathnames.h" int qmgrMaxIdle; /* Inactivity timeout */ static int qmgrMsgCount = 0; /* Messages delivered */ QMGR_WorkerQ qmgrWorkers; /* MAIN: Active workers */ Uint qmgrWorkerCount=0; static volatile int SigDIE = 0; char *pathQueue; char pathQueueTmpDir[FILENAME_MAX]; int QMGR_Read(int fd, void *data, size_t len) { size_t nread; ssize_t rv; for (nread = 0; nread < len; ) { rv = read(fd, data+nread, len-nread); if (rv == -1) { if (errno == EINTR || errno == EAGAIN) { if (QMGR_CheckSignals()) { exit(1); } continue; } else { MPD_SetError("QMGR_Read: %s", strerror(errno)); return (-1); } } else if (rv == 0) { MPD_SetErrorS("EOF"); return (-1); } nread += rv; } return (0); } int QMGR_Write(int fd, const void *data, size_t len) { size_t nwrote; ssize_t rv; for (nwrote = 0; nwrote < len; ) { rv = write(fd, data+nwrote, len-nwrote); if (rv == -1) { if (errno == EINTR || errno == EAGAIN) { if (QMGR_CheckSignals()) { exit(1); } continue; } else { MPD_SetError("QMGR_Write: %s", strerror(errno)); return (-1); } } else if (rv == 0) { MPD_SetErrorS("EOF"); return (-1); } nwrote += rv; } return (0); } int QMGR_Init(CFG_File *cf) { struct stat sb; CFG_GetStr(cf, "qmgr.path", &pathQueue, _PATH_QUEUE); CFG_GetInt(cf, "qmgr.max-idle", &qmgrMaxIdle, 30); Strlcpy(pathQueueTmpDir, pathQueue, sizeof(pathQueueTmpDir)); Strlcat(pathQueueTmpDir, "tmp/", sizeof(pathQueueTmpDir)); if (stat(pathQueue, &sb) != 0 && mkdir(pathQueue, 0700) == -1) { MPD_SetError("%s: %s", pathQueue, strerror(errno)); return (-1); } if (stat(pathQueueTmpDir, &sb) != 0 && mkdir(pathQueueTmpDir, 0700) == -1) { MPD_SetError("%s: %s", pathQueue, strerror(errno)); return (-1); } return (0); } /* Return the number of items in the queue. */ int QMGR_Todo(void) { DIR *dir; int count = 0; if ((dir = opendir(pathQueue)) == NULL) { syslog(LOG_ERR, "opendir %s: %m", pathQueue); return (0); } while (readdir(dir) != NULL) { count++; } closedir(dir); return (count>2 ? count-2 : 0); } /* * Load a message from the queue. If the meta argument is NULL, metadata is * recovered from file. */ MPD_Message * QMGR_LoadMessage(const char *path, const QMGR_MetaData *meta) { char rcpt_to[ADDRESS_MAX]; QMGR_MetaData rmeta; MPD_Recipient *rcpt; MPD_Message *msg; off_t msgEnd; int fd; try_open: if ((fd = open(path, O_RDONLY)) == -1) { if (errno == EINTR) { if (QMGR_CheckSignals()) { return (NULL); } goto try_open; } MPD_SetErrorS(strerror(errno)); return (NULL); } if ((msg = MPD_MessageNew()) == NULL) return (NULL); if (meta != NULL) { Strlcpy(msg->mail_from, meta->from, sizeof(msg->mail_from)); Strlcpy(rcpt_to, meta->rcpt, sizeof(rcpt_to)); } else { if (QMGR_Read(fd, &rmeta, sizeof(QMGR_MetaData)) == -1) { MPD_SetError("Read meta: %s", MPD_GetError()); goto fail; } Strlcpy(msg->mail_from, rmeta.from, sizeof(msg->mail_from)); Strlcpy(msg->ip, rmeta.ip, sizeof(msg->ip)); Strlcpy(rcpt_to, rmeta.rcpt, sizeof(rcpt_to)); } if ((rcpt = MPD_MessageAddRecipient(msg, rcpt_to)) == NULL || MPD_ParseRecipientParts(rcpt) == -1) goto fail; /* Read the message body */ if ((msgEnd = lseek(fd, 0, SEEK_END)) == -1) { MPD_SetError("SEEK_END: %s", strerror(errno)); goto fail; } if (lseek(fd, sizeof(QMGR_MetaData), SEEK_SET) == -1) { MPD_SetError("SEEK_SET: %s", strerror(errno)); goto fail; } msg->text_len = (size_t)msgEnd - sizeof(QMGR_MetaData); if ((msg->text = malloc(msg->text_len+1)) == NULL) { MPD_SetErrorS("Out of memory"); goto fail; } if (QMGR_Read(fd, msg->text, msg->text_len) == -1) { MPD_SetError("LoadMessage: %s", MPD_GetError()); goto fail; } msg->text[msg->text_len] = '\0'; close(fd); return (msg); fail: MPD_MessageFree(msg); close(fd); return (NULL); } /* * Enter a message onto the queue. On success, write metadata back to * the master process. * * CONTEXT: SMTP Worker subprocess. */ int QMGR_Queue(MPD_Message *msg, MPD_Recipient *rcpt, int masterPipe) { char pathDst[FILENAME_MAX], pathDstBase[FILENAME_MAX]; char pathTmp[FILENAME_MAX]; struct iovec vec[2]; QMGR_MetaData meta; struct stat sb; char *queueID; int fd; bzero(&meta, sizeof(meta)); if (LOCAL_GetDefaultRecipientUID(rcpt->addr, &meta.uid, &meta.gid) == -1) return (-1); Strlcpy(pathTmp, pathQueueTmpDir, sizeof(pathTmp)); Strlcat(pathTmp, "XXXXXXXXXX", sizeof(pathTmp)); if ((fd = mkstemp(pathTmp)) == -1) { MPD_SetError("mkstemp: %s", strerror(errno)); return (-1); } if ((queueID = strrchr(pathTmp, '/')) == NULL || queueID[1] == '\0') { MPD_SetErrorS("mkstemp"); goto fail_rm; } queueID++; snprintf(pathDstBase, sizeof(pathDstBase), "%s%u/", pathQueue, meta.uid); if (stat(pathDstBase, &sb) != 0) { Debug("Creating: %s", pathDstBase); if (mkdir(pathDstBase, 0700) == -1) { MPD_SetError("%s: %s", pathDstBase, strerror(errno)); return (-1); } } Strlcpy(pathDst, pathDstBase, sizeof(pathDst)); Strlcat(pathDst, queueID, sizeof(pathDst)); if (fchown(fd, meta.uid, meta.gid) == -1) { MPD_SetError("%s: chown: %s", pathTmp, strerror(errno)); goto fail_rm; } Strlcpy(meta.qid, queueID, sizeof(meta.qid)); Strlcpy(meta.from, msg->mail_from, sizeof(meta.from)); Strlcpy(meta.rcpt, rcpt->addr, sizeof(meta.rcpt)); Strlcpy(meta.ip, msg->ip, sizeof(meta.ip)); /* * Save the metadata followed by the message body. The metadata is * only actually used in the event of a crash or server restart. */ vec[0].iov_base = &meta; vec[0].iov_len = sizeof(meta); vec[1].iov_base = msg->text; vec[1].iov_len = msg->text_len; if (QMGR_Writev(fd, vec, 2) == -1) { goto fail; } fsync(fd); close(fd); /* * Now that the message is safely written to the queue, notify * the master process by sending it the metadata block. */ if (rename(pathTmp, pathDst) == -1) { MPD_SetError("rename(%s->%s): %s", pathTmp, pathDst, strerror(errno)); return (-1); } return QMGR_Write(masterPipe, &meta, sizeof(meta)); fail_rm: unlink(pathTmp); fail: close(fd); return (-1); } /* Process and deliver a message. If successful, dequeue it as well. */ static int QMGR_ProcessMessage(const char *path, const QMGR_MetaData *meta) { MPD_Message *msg; MPD_Recipient *rcpt; int rv; if ((msg = QMGR_LoadMessage(path, meta)) == NULL) return (-1); rcpt = TAILQ_FIRST(&msg->rcpts); /* Only 1 rcpt possible */ rcpt->ml = ML_GetMailListReq(rcpt); if (rcpt->ml != NULL) { /* Mailing list */ rv = ML_MessageProcess(msg, rcpt); Debug("[ML] From <%s> to List <%s>, %luK", msg->mail_from, rcpt->ml->name, (Ulong)msg->text_len/1024); } else { /* Regular address */ rv = MPD_MessageProcess(msg, rcpt); Debug("[MSG] From <%s> to <%s>, %luK", msg->mail_from, rcpt->addr, (Ulong)msg->text_len/1024); } MPD_MessageFree(msg); if (rv == 0) { Unlink(path); } else if (rv == -1) { syslog(LOG_ERR, "%s: PERM: %s", path, MPD_GetError()); unlink(path); } else if (rv == 1) { syslog(LOG_ERR, "%s: SOFTFAIL: %s", path, MPD_GetError()); } if (getuid() != 0 || geteuid() != 0 || /* Sanity check */ getgid() != 0 || getegid() != 0) { syslog(LOG_ERR, "QMGR bad uid %d:%d", (int)getuid(), (int)geteuid()); exit(1); } return (0); } /* * Scan the queue and process messages under my UID. Return number of * messages processed or -1 on fatal error. */ static int QMGR_ProcessQueue(uid_t uid) { char pathBase[FILENAME_MAX]; char path[FILENAME_MAX]; DIR *dir; struct dirent *dp; struct stat sb; int count = 0; snprintf(pathBase, sizeof(pathBase), "%s%u/", pathQueue, uid); if ((dir = opendir(pathBase)) == NULL) { MPD_SetError("%s: %s", pathBase, strerror(errno)); return (-1); } while ((dp = readdir(dir)) != NULL) { if (dp->d_name[0] == '.') { continue; } Strlcpy(path, pathBase, sizeof(path)); Strlcat(path, dp->d_name, sizeof(path)); if (stat(path, &sb) == -1 || sb.st_uid != uid) { syslog(LOG_WARNING, "Bad qfile: %s", path); continue; } if (QMGR_ProcessMessage(path, NULL) == -1) { syslog(LOG_ERR, "Queue(%s): %s", path, MPD_GetError()); continue; } count++; } closedir(dir); qmgrMsgCount += count; return (count); } static void SigTERM(int sigraised) { SigDIE++; } int QMGR_CheckSignals(void) { if (SigDIE) { SigDIE = 0; MPD_SetErrorS("Exiting (signal)"); return (1); } return (0); } /* * Spawn a new worker process if necessary. * CONTEXT: Main. */ int QMGR_WakeWorker(const QMGR_MetaData *meta) { QMGR_Worker *worker; TAILQ_FOREACH(worker, &qmgrWorkers, workers) { if (worker->uid == meta->uid && worker->gid == meta->gid) break; } if (worker == NULL) { int pp[2]; pid_t pid; if (qmgrWorkerCount+1 > mpdMaxWorkers) { MPD_SetErrorS("Too many workers"); return (-1); } Debug("Creating new worker for %d:%d", meta->uid, meta->gid); if (pipe(pp) == -1) { MPD_SetError("pipe: %s", strerror(errno)); return (-1); } if ((pid = fork()) == -1) { MPD_SetError("fork: %s", strerror(errno)); close(pp[0]); close(pp[1]); return (-1); } else if (pid == 0) { /* Child */ dup2(pp[0], STDIN_FILENO); close(pp[1]); MPD_EnterServerProc("worker"); if (QMGR_WorkerMain(meta->uid) == -1) { syslog(LOG_ERR, "WORKER: %s", MPD_GetError()); } MPD_ExitServerProc(0); } else { worker = Malloc(sizeof(QMGR_Worker)); worker->pid = pid; worker->uid = meta->uid; worker->gid = meta->gid; worker->pipe = pp[1]; close(pp[0]); TAILQ_INSERT_TAIL(&qmgrWorkers, worker, workers); qmgrWorkerCount++; } } return (0); } /* Main worker process routine. */ int QMGR_WorkerMain(uid_t uid) { char pathBase[FILENAME_MAX]; struct sigaction sa; int rv, fdDir, kq; struct kevent change, event; struct timespec timeo; sa.sa_handler = SigTERM; sigfillset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGTERM, &sa, NULL); sigaction(SIGUSR1, &sa, NULL); if (uid < QMGR_UIDMIN || (getpwuid(uid) == NULL)) { MPD_SetError("Bad UID (%d)", uid); return (-1); } Setproctitle("worker %u (...)", uid); /* Process the initial message that caused our invocation. */ if (QMGR_ProcessQueue(uid) == -1) return (-1); Setproctitle("worker %u (1 msg)", uid); if ((kq = kqueue()) == -1) { MPD_SetError("kqueue: %s", strerror(errno)); return (-1); } /* Monitor the queue directory for more messages, or timeout */ snprintf(pathBase, sizeof(pathBase), "%s%u/", pathQueue, uid); if ((fdDir = open(pathBase, O_RDONLY|O_DIRECTORY)) == -1) { MPD_SetError("%s: %s", pathBase, strerror(errno)); goto fail; } EV_SET(&change, fdDir, EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_ONESHOT, NOTE_DELETE | NOTE_EXTEND | NOTE_WRITE | NOTE_ATTRIB | NOTE_LINK, 0, 0); timeo.tv_sec = qmgrMaxIdle; timeo.tv_nsec = 0; for (;;) { int nev; if ((nev = kevent(kq, &change, 1, &event, 1, &timeo)) == -1) { if (errno == EINTR) { if (QMGR_CheckSignals()) { break; } continue; } else { MPD_SetErrorS("kevent"); goto fail; } } if (nev > 0) { if ((rv = QMGR_ProcessQueue(uid)) == -1) goto fail; } else if (nev == 0) { break; } } close(fdDir); close(kq); return (0); fail: close(kq); return (-1); }