minor pulsar bugfixes

This commit is contained in:
Jonathan Shook 2021-02-05 13:28:30 -06:00
parent 6b6afdc29c
commit 48dce82591
3 changed files with 19 additions and 11 deletions

View File

@ -21,7 +21,11 @@ public class PulsarRecvOp implements PulsarOp {
Message<byte[]> msgbytes = consumer.receive();
// TODO: Parameterize the actions taken on a received message
// TODO: Properly parameterize all pulsar Op types as with Producer<T> and Consumer<T>
System.out.println("received:" + new String(msgbytes.getValue(), StandardCharsets.UTF_8));
String received = new String(msgbytes.getValue(), StandardCharsets.UTF_8);
System.out.print("received:" + received);
if (!received.endsWith("\n")) {
System.out.println("\n");
}
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}

View File

@ -20,6 +20,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
// TODO: Add docs for the command template with respect to the OpTemplate
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) {
// TODO: Consider parsing map structures into equivalent binding representation
this.cmdTpl = new CommandTemplate(opTemplate);
this.pcache = pcache;
if (cmdTpl.isDynamic("op_scope")) {
@ -54,12 +55,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
} else {
topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (
!cmdTpl.isDynamic("persistence") // optimize topic around static piece-wise values
&& !cmdTpl.isDynamic("tenant")
&& !cmdTpl.isDynamic("namespace")
&& !cmdTpl.isDynamic("topic")
) {
} else if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
@ -69,10 +65,10 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
topic_uri_func = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topic_uri_func = (l) ->
cmdTpl.getDynamicOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getDynamicOr("tenant", l, "public")
+ "/" + cmdTpl.getDynamicOr("namespace", l, "default")
+ "/" + cmdTpl.getDynamicOr("topic", l, "default");
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "default");
}

View File

@ -283,4 +283,12 @@ public class CommandTemplate {
return false;
}
public boolean isStaticOrUnsetSet(String... varnames) {
for (String varname : varnames) {
if (isDynamic(varname)) {
return false;
}
}
return true;
}
}