Skip to content

Commit

Permalink
Reduce redundant call of prepareClientToWrite when call addReply* con…
Browse files Browse the repository at this point in the history
…tinuously.

Signed-off-by: Lipeng Zhu <[email protected]>
Co-authored-by: Wangyang Guo <[email protected]>
  • Loading branch information
lipzhu and guowangy committed Jun 19, 2024
1 parent 5a51bf5 commit 3ec1437
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 60 deletions.
4 changes: 2 additions & 2 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ void migrateCommand(client *c) {
if (num_keys == 0) {
zfree(ov);
zfree(kv);
addReplySds(c, sdsnew("+NOKEY\r\n"));
addReplySds(c, sdsnew("+NOKEY\r\n"), 0);
return;
}

Expand Down Expand Up @@ -1425,7 +1425,7 @@ void clusterCommandSlots(client *c) {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1);
}

addReplyProto(c, cached_reply, sdslen(cached_reply));
addReplyProto(c, cached_reply, sdslen(cached_reply), 0);
}

/* -----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6221,7 +6221,7 @@ int clusterCommandSpecial(client *c) {
int retval = clusterBumpConfigEpochWithoutConsensus();
sds reply = sdscatprintf(sdsempty(), "+%s %llu\r\n", (retval == C_OK) ? "BUMPED" : "STILL",
(unsigned long long)myself->configEpoch);
addReplySds(c, reply);
addReplySds(c, reply, 0);
} else if (!strcasecmp(c->argv[1]->ptr, "saveconfig") && c->argc == 2) {
int retval = clusterSaveConfig(1);

Expand Down
2 changes: 1 addition & 1 deletion src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ void debugCommand(client *c) {
errstr = sdscatsds(errstr, c->argv[2]->ptr);
errstr = sdsmapchars(errstr, "\n\r", " ", 2); /* no newlines in errors. */
errstr = sdscatlen(errstr, "\r\n", 2);
addReplySds(c, errstr);
addReplySds(c, errstr, 0);
} else if (!strcasecmp(c->argv[1]->ptr, "structsize") && c->argc == 2) {
sds sizes = sdsempty();
sizes = sdscatprintf(sizes, "bits:%d ", (sizeof(void *) == 8) ? 64 : 32);
Expand Down
9 changes: 5 additions & 4 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -3110,9 +3110,10 @@ int VM_ReplyWithErrorFormat(ValkeyModuleCtx *ctx, const char *fmt, ...) {
int VM_ReplyWithSimpleString(ValkeyModuleCtx *ctx, const char *msg) {
client *c = moduleGetReplyClient(ctx);
if (c == NULL) return VALKEYMODULE_OK;
addReplyProto(c, "+", 1);
addReplyProto(c, msg, strlen(msg));
addReplyProto(c, "\r\n", 2);
if (prepareClientToWrite(c) != C_OK) return VALKEYMODULE_OK;
addReplyProto(c, "+", 1, 1);
addReplyProto(c, msg, strlen(msg), 1);
addReplyProto(c, "\r\n", 2, 1);
return VALKEYMODULE_OK;
}

Expand Down Expand Up @@ -3426,7 +3427,7 @@ int VM_ReplyWithCallReply(ValkeyModuleCtx *ctx, ValkeyModuleCallReply *reply) {
}
size_t proto_len;
const char *proto = callReplyGetProto(reply, &proto_len);
addReplyProto(c, proto, proto_len);
addReplyProto(c, proto, proto_len, 0);
/* Propagate the error list from that reply to the other client, to do some
* post error reply handling, like statistics.
* Note that if the original reply had an array with errors, and the module
Expand Down
103 changes: 57 additions & 46 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,13 @@ void addReply(client *c, robj *obj) {

/* Add the SDS 's' string to the client output buffer, as a side effect
* the SDS string is freed. */
void addReplySds(client *c, sds s) {
if (prepareClientToWrite(c) != C_OK) {
/* The caller expects the sds to be free'd. */
sdsfree(s);
return;
void addReplySds(client *c, sds s, int skip_client_to_write_check) {
if (!skip_client_to_write_check) {
if (prepareClientToWrite(c) != C_OK) {
/* The caller expects the sds to be free'd. */
sdsfree(s);
return;
}
}
_addReplyToBufferOrList(c, s, sdslen(s));
sdsfree(s);
Expand All @@ -503,8 +505,10 @@ void addReplySds(client *c, sds s) {
* if not needed. The object will only be created by calling
* _addReplyProtoToList() if we fail to extend the existing tail object
* in the list of objects. */
void addReplyProto(client *c, const char *s, size_t len) {
if (prepareClientToWrite(c) != C_OK) return;
void addReplyProto(client *c, const char *s, size_t len, int skip_client_to_write_check) {
if (!skip_client_to_write_check) {
if (prepareClientToWrite(c) != C_OK) return;
}
_addReplyToBufferOrList(c, s, len);
}

Expand All @@ -518,11 +522,12 @@ void addReplyProto(client *c, const char *s, size_t len) {
* error code is automatically added.
* Note that 's' must NOT end with \r\n. */
void addReplyErrorLength(client *c, const char *s, size_t len) {
if (prepareClientToWrite(c) != C_OK) return;
/* If the string already starts with "-..." then the error code
* is provided by the caller. Otherwise we use "-ERR". */
if (!len || s[0] != '-') addReplyProto(c, "-ERR ", 5);
addReplyProto(c, s, len);
addReplyProto(c, "\r\n", 2);
if (!len || s[0] != '-') addReplyProto(c, "-ERR ", 5, 1);
addReplyProto(c, s, len, 1);
addReplyProto(c, "\r\n", 2, 1);
}

/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.)
Expand Down Expand Up @@ -714,9 +719,10 @@ void addReplyErrorExpireTime(client *c) {
}

void addReplyStatusLength(client *c, const char *s, size_t len) {
addReplyProto(c, "+", 1);
addReplyProto(c, s, len);
addReplyProto(c, "\r\n", 2);
if (prepareClientToWrite(c) != C_OK) return;
addReplyProto(c, "+", 1, 1);
addReplyProto(c, s, len, 1);
addReplyProto(c, "\r\n", 2, 1);
}

void addReplyStatus(client *c, const char *status) {
Expand Down Expand Up @@ -906,7 +912,7 @@ void addReplyDouble(client *c, double d) {
dbuf[dlen + 1] = '\r';
dbuf[dlen + 2] = '\n';
dbuf[dlen + 3] = '\0';
addReplyProto(c, dbuf, dlen + 3);
addReplyProto(c, dbuf, dlen + 3, 0);
} else {
char dbuf[MAX_LONG_DOUBLE_CHARS + 32];
/* In order to prepend the string length before the formatted number,
Expand All @@ -929,17 +935,18 @@ void addReplyDouble(client *c, double d) {
dbuf[dlen + 7] = '\r';
dbuf[dlen + 8] = '\n';
dbuf[dlen + 9] = '\0';
addReplyProto(c, dbuf + start, dlen + 9 - start);
addReplyProto(c, dbuf + start, dlen + 9 - start, 0);
}
}

void addReplyBigNum(client *c, const char *num, size_t len) {
if (c->resp == 2) {
addReplyBulkCBuffer(c, num, len);
} else {
addReplyProto(c, "(", 1);
addReplyProto(c, num, len);
addReplyProto(c, "\r\n", 2);
if (prepareClientToWrite(c) != C_OK) return;
addReplyProto(c, "(", 1, 1);
addReplyProto(c, num, len, 1);
addReplyProto(c, "\r\n", 2, 1);
}
}

Expand All @@ -952,17 +959,18 @@ void addReplyHumanLongDouble(client *c, long double d) {
addReplyBulk(c, o);
decrRefCount(o);
} else {
if (prepareClientToWrite(c) != C_OK) return;
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), d, LD_STR_HUMAN);
addReplyProto(c, ",", 1);
addReplyProto(c, buf, len);
addReplyProto(c, "\r\n", 2);
addReplyProto(c, ",", 1, 1);
addReplyProto(c, buf, len, 1);
addReplyProto(c, "\r\n", 2, 1);
}
}

/* Add a long long as integer reply or bulk len / multi bulk count.
* Basically this is used to output <prefix><long long><crlf>. */
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix, int skip_client_to_write_check) {
char buf[128];
int len;

Expand All @@ -972,24 +980,24 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
const int opt_hdr = ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0;
const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(ll);
if (prefix == '*' && opt_hdr) {
addReplyProto(c, shared.mbulkhdr[ll]->ptr, hdr_len);
addReplyProto(c, shared.mbulkhdr[ll]->ptr, hdr_len, skip_client_to_write_check);
return;
} else if (prefix == '$' && opt_hdr) {
addReplyProto(c, shared.bulkhdr[ll]->ptr, hdr_len);
addReplyProto(c, shared.bulkhdr[ll]->ptr, hdr_len, skip_client_to_write_check);
return;
} else if (prefix == '%' && opt_hdr) {
addReplyProto(c, shared.maphdr[ll]->ptr, hdr_len);
addReplyProto(c, shared.maphdr[ll]->ptr, hdr_len, skip_client_to_write_check);
return;
} else if (prefix == '~' && opt_hdr) {
addReplyProto(c, shared.sethdr[ll]->ptr, hdr_len);
addReplyProto(c, shared.sethdr[ll]->ptr, hdr_len, skip_client_to_write_check);
return;
}

buf[0] = prefix;
len = ll2string(buf + 1, sizeof(buf) - 1, ll);
buf[len + 1] = '\r';
buf[len + 2] = '\n';
addReplyProto(c, buf, len + 3);
addReplyProto(c, buf, len + 3, skip_client_to_write_check);
}

void addReplyLongLong(client *c, long long ll) {
Expand All @@ -998,12 +1006,12 @@ void addReplyLongLong(client *c, long long ll) {
else if (ll == 1)
addReply(c, shared.cone);
else
addReplyLongLongWithPrefix(c, ll, ':');
addReplyLongLongWithPrefix(c, ll, ':', 0);
}

void addReplyAggregateLen(client *c, long length, int prefix) {
serverAssert(length >= 0);
addReplyLongLongWithPrefix(c, length, prefix);
addReplyLongLongWithPrefix(c, length, prefix, 0);
}

void addReplyArrayLen(client *c, long length) {
Expand Down Expand Up @@ -1034,17 +1042,17 @@ void addReplyPushLen(client *c, long length) {

void addReplyNull(client *c) {
if (c->resp == 2) {
addReplyProto(c, "$-1\r\n", 5);
addReplyProto(c, "$-1\r\n", 5, 0);
} else {
addReplyProto(c, "_\r\n", 3);
addReplyProto(c, "_\r\n", 3, 0);
}
}

void addReplyBool(client *c, int b) {
if (c->resp == 2) {
addReply(c, b ? shared.cone : shared.czero);
} else {
addReplyProto(c, b ? "#t\r\n" : "#f\r\n", 4);
addReplyProto(c, b ? "#t\r\n" : "#f\r\n", 4, 0);
}
}

Expand All @@ -1054,38 +1062,40 @@ void addReplyBool(client *c, int b) {
* Null type "_\r\n". */
void addReplyNullArray(client *c) {
if (c->resp == 2) {
addReplyProto(c, "*-1\r\n", 5);
addReplyProto(c, "*-1\r\n", 5, 0);
} else {
addReplyProto(c, "_\r\n", 3);
addReplyProto(c, "_\r\n", 3, 0);
}
}

/* Create the length prefix of a bulk reply, example: $2234 */
void addReplyBulkLen(client *c, robj *obj) {
size_t len = stringObjectLen(obj);

addReplyLongLongWithPrefix(c, len, '$');
addReplyLongLongWithPrefix(c, len, '$', 0);
}

/* Add an Object as a bulk reply */
void addReplyBulk(client *c, robj *obj) {
addReplyBulkLen(c, obj);
addReply(c, obj);
addReplyProto(c, "\r\n", 2);
addReplyProto(c, "\r\n", 2, 0);
}

/* Add a C buffer as bulk reply */
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
addReplyLongLongWithPrefix(c, len, '$');
addReplyProto(c, p, len);
addReplyProto(c, "\r\n", 2);
if (prepareClientToWrite(c) != C_OK) return;
addReplyLongLongWithPrefix(c, len, '$', 1);
addReplyProto(c, p, len, 1);
addReplyProto(c, "\r\n", 2, 1);
}

/* Add sds to reply (takes ownership of sds and frees it) */
void addReplyBulkSds(client *c, sds s) {
addReplyLongLongWithPrefix(c, sdslen(s), '$');
addReplySds(c, s);
addReplyProto(c, "\r\n", 2);
if (prepareClientToWrite(c) != C_OK) return;
addReplyLongLongWithPrefix(c, sdslen(s), '$', 1);
addReplySds(c, s, 1);
addReplyProto(c, "\r\n", 2, 1);
}

/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
Expand Down Expand Up @@ -1137,9 +1147,10 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
p[i] = *ext++;
}
}
addReplyProto(c, buf, preflen);
addReplyProto(c, s, len);
addReplyProto(c, "\r\n", 2);
if (prepareClientToWrite(c) != C_OK) return;
addReplyProto(c, buf, preflen, 1);
addReplyProto(c, s, len, 1);
addReplyProto(c, "\r\n", 2, 1);
}
}

Expand Down Expand Up @@ -1210,7 +1221,7 @@ void AddReplyFromClient(client *dst, client *src) {
}

/* First add the static buffer (either into the static buffer or reply list) */
addReplyProto(dst, src->buf, src->bufpos);
addReplyProto(dst, src->buf, src->bufpos, 0);

/* We need to check with prepareClientToWrite again (after addReplyProto)
* since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
Expand Down
2 changes: 1 addition & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3089,7 +3089,7 @@ void replicaofCommand(client *c) {
"with the primary we are already connected "
"with. No operation performed.");
addReplySds(c, sdsnew("+OK Already connected to specified "
"master\r\n"));
"master\r\n"), 0);
return;
}
/* There was no previous primary or the user specified a different one,
Expand Down
2 changes: 1 addition & 1 deletion src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -3900,7 +3900,7 @@ NULL
addReplySds(c, sdscatfmt(sdsempty(),
"+OK %i usable Sentinels. Quorum and failover authorization "
"can be reached\r\n",
usable));
usable), 0);
} else {
sds e = sdscatfmt(sdsempty(), "-NOQUORUM %i usable Sentinels. ", usable);
if (result & SENTINEL_ISQR_NOQUORUM)
Expand Down
6 changes: 3 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2644,15 +2644,15 @@ void addReplyNull(client *c);
void addReplyNullArray(client *c);
void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len);
void addReplyProto(client *c, const char *s, size_t len, int skip_client_to_write_check);
void AddReplyFromClient(client *c, client *src);
void addReplyBulk(client *c, robj *obj);
void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj *obj);
void addReplyStatusLength(client *c, const char *s, size_t len);
void addReplySds(client *c, sds s);
void addReplySds(client *c, sds s, int skip_client_to_write_check);
void addReplyBulkSds(client *c, sds s);
void setDeferredReplyBulkSds(client *c, void *node, sds s);
void addReplyErrorObject(client *c, robj *err);
Expand All @@ -2667,7 +2667,7 @@ void addReplyErrorArity(client *c);
void addReplyErrorExpireTime(client *c);
void addReplyStatus(client *c, const char *status);
void addReplyDouble(client *c, double d);
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix);
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix, int skip_client_to_write_check);
void addReplyBigNum(client *c, const char *num, size_t len);
void addReplyHumanLongDouble(client *c, long double d);
void addReplyLongLong(client *c, long long ll);
Expand Down
2 changes: 1 addition & 1 deletion src/tracking.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {

/* Send the "value" part, which is the array of keys. */
if (proto) {
addReplyProto(c, keyname, keylen);
addReplyProto(c, keyname, keylen, 0);
} else {
addReplyArrayLen(c, 1);
addReplyBulkCBuffer(c, keyname, keylen);
Expand Down

0 comments on commit 3ec1437

Please sign in to comment.